# for a message slice. This needs to be below Akka's maximum-frame-size and defaults to 480KiB.
maximum-message-slice-size=491520
-# Enable tell-based protocol between frontend (applications) and backend (shards). Using this protocol
-# should avoid AskTimeoutExceptions seen under heavy load. Defaults to false (use tell-based protocol).
-# Set to false to enable ask-based protocol.
-# This option is obsolete and will be removed in the next major release.
-use-tell-based-protocol=true
-
# Tune the maximum number of entries a follower is allowed to lag behind the leader before it is
# considered out-of-sync. This flag may require tuning in face of a large number of small transactions.
#sync-index-threshold=10
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStorePropertiesContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStorePropertiesContainer;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.List;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.Empty;
-import scala.concurrent.Future;
-
-/**
- * Abstract base class for {@link DOMStoreThreePhaseCommitCohort} instances returned by this
- * implementation. In addition to the usual set of methods it also contains the list of actor
- * futures.
- */
-public abstract class AbstractThreePhaseCommitCohort<T> implements DOMStoreThreePhaseCommitCohort {
- protected static final @NonNull ListenableFuture<Empty> IMMEDIATE_EMPTY_SUCCESS =
- Futures.immediateFuture(Empty.value());
- protected static final @NonNull ListenableFuture<Boolean> IMMEDIATE_BOOLEAN_SUCCESS =
- Futures.immediateFuture(Boolean.TRUE);
-
- abstract List<Future<T>> getCohortFutures();
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import java.util.Collection;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.util.Try;
-
-/**
- * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
- * transaction factories.
- */
-abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory> implements AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
- @SuppressWarnings("rawtypes")
- private static final AtomicLongFieldUpdater<AbstractTransactionContextFactory> TX_COUNTER_UPDATER =
- AtomicLongFieldUpdater.newUpdater(AbstractTransactionContextFactory.class, "nextTx");
-
- private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
- private final @NonNull LocalHistoryIdentifier historyId;
- private final @NonNull ActorUtils actorUtils;
-
- // Used via TX_COUNTER_UPDATER
- @SuppressWarnings("unused")
- private volatile long nextTx;
-
- protected AbstractTransactionContextFactory(final ActorUtils actorUtils, final LocalHistoryIdentifier historyId) {
- this.actorUtils = requireNonNull(actorUtils);
- this.historyId = requireNonNull(historyId);
- }
-
- final ActorUtils getActorUtils() {
- return actorUtils;
- }
-
- final LocalHistoryIdentifier getHistoryId() {
- return historyId;
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent,
- final String shardName) {
- final LocalTransactionFactory local = knownLocal.get(shardName);
- if (local != null) {
- LOG.debug("Tx {} - Creating local component for shard {} using factory {}", parent.getIdentifier(),
- shardName, local);
-
- try {
- return createLocalTransactionContext(local, parent);
- } catch (Exception e) {
- return new NoOpTransactionContext(e, parent.getIdentifier());
- }
- }
-
- return null;
- }
-
- private AbstractTransactionContextWrapper maybeCreateDirectTransactionContextWrapper(
- final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
- final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
- LOG.debug("Tx {}: Found primary {} for shard {}, trying to use DirectTransactionContextWrapper",
- parent.getIdentifier(), primaryShardInfo.getPrimaryShardActor(), shardName);
-
- updateShardInfo(shardName, primaryShardInfo);
-
- final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
- try {
- if (localContext != null) {
- LOG.debug("Tx {}: Local transaction context created successfully, using DirectTransactionWrapper",
- parent.getIdentifier());
- return new DirectTransactionContextWrapper(parent.getIdentifier(), actorUtils, shardName,
- localContext);
- }
-
- LOG.debug("Tx {}: Local transaction context creation failed, using DelayedTransactionWrapper",
- parent.getIdentifier());
- final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
- transactionContextWrapper, parent, shardName);
- remote.setPrimaryShard(primaryShardInfo);
- return transactionContextWrapper;
- } finally {
- onTransactionContextCreated(parent.getIdentifier());
- }
- }
-
- private void onFindPrimaryShardSuccess(final PrimaryShardInfo primaryShardInfo, final TransactionProxy parent,
- final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
- LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
- primaryShardInfo.getPrimaryShardActor(), shardName);
-
- updateShardInfo(shardName, primaryShardInfo);
-
- final TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
- try {
- if (localContext != null) {
- transactionContextWrapper.executePriorTransactionOperations(localContext);
- } else {
- final RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(
- transactionContextWrapper, parent, shardName);
- remote.setPrimaryShard(primaryShardInfo);
- }
- } finally {
- onTransactionContextCreated(parent.getIdentifier());
- }
- }
-
- private void onFindPrimaryShardFailure(final Throwable failure, final TransactionProxy parent,
- final String shardName, final DelayedTransactionContextWrapper transactionContextWrapper) {
- LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
-
- try {
- transactionContextWrapper.executePriorTransactionOperations(
- new NoOpTransactionContext(failure, parent.getIdentifier()));
- } finally {
- onTransactionContextCreated(parent.getIdentifier());
- }
- }
-
- final AbstractTransactionContextWrapper newTransactionContextWrapper(final TransactionProxy parent,
- final String shardName) {
- final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper(
- parent.getIdentifier(), actorUtils, shardName);
- final Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier());
- if (findPrimaryFuture.isCompleted()) {
- final Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
- if (maybe.isSuccess()) {
- return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName, contextWrapper);
- }
-
- onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, contextWrapper);
- } else {
- findPrimaryFuture.onComplete(result -> {
- if (result.isSuccess()) {
- onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
- } else {
- onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper);
- }
- return null;
- }, actorUtils.getClientDispatcher());
- }
- return contextWrapper;
- }
-
- private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
- final Optional<ReadOnlyDataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
- if (maybeDataTree.isPresent()) {
- if (!knownLocal.containsKey(shardName)) {
- LOG.debug("Shard {} resolved to local data tree - adding local factory", shardName);
-
- F factory = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(),
- maybeDataTree.orElseThrow());
- knownLocal.putIfAbsent(shardName, factory);
- }
- } else if (knownLocal.containsKey(shardName)) {
- LOG.debug("Shard {} invalidating local data tree", shardName);
-
- knownLocal.remove(shardName);
- }
- }
-
- protected final MemberName getMemberName() {
- return historyId.getClientId().getFrontendId().getMemberName();
- }
-
- /**
- * Create an identifier for the next TransactionProxy attached to this component
- * factory.
- * @return Transaction identifier, may not be null.
- */
- protected final TransactionIdentifier nextIdentifier() {
- return new TransactionIdentifier(historyId, TX_COUNTER_UPDATER.getAndIncrement(this));
- }
-
- /**
- * Find the primary shard actor.
- *
- * @param shardName Shard name
- * @return Future containing shard information.
- */
- protected abstract Future<PrimaryShardInfo> findPrimaryShard(@NonNull String shardName,
- @NonNull TransactionIdentifier txId);
-
- /**
- * Create local transaction factory for specified shard, backed by specified shard leader
- * and data tree instance.
- *
- * @param shardName the shard name
- * @param shardLeader the shard leader
- * @param dataTree Backing data tree instance. The data tree may only be accessed in
- * read-only manner.
- * @return Transaction factory for local use.
- */
- protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, ReadOnlyDataTree dataTree);
-
- /**
- * Callback invoked from child transactions to push any futures, which need to
- * be waited for before the next transaction is allocated.
- * @param cohortFutures Collection of futures
- */
- protected abstract <T> void onTransactionReady(@NonNull TransactionIdentifier transaction,
- @NonNull Collection<Future<T>> cohortFutures);
-
- /**
- * Callback invoked when the internal TransactionContext has been created for a transaction.
- *
- * @param transactionId the ID of the transaction.
- */
- protected abstract void onTransactionContextCreated(@NonNull TransactionIdentifier transactionId);
-
- private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory,
- final TransactionProxy parent) {
- return switch (parent.getType()) {
- case READ_ONLY -> {
- final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
- yield new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
- @Override
- DOMStoreWriteTransaction getWriteDelegate() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- DOMStoreReadTransaction getReadDelegate() {
- return readOnly;
- }
- };
- }
- case READ_WRITE -> {
- final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
- yield new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
- @Override
- DOMStoreWriteTransaction getWriteDelegate() {
- return readWrite;
- }
-
- @Override
- DOMStoreReadTransaction getReadDelegate() {
- return readWrite;
- }
- };
- }
- case WRITE_ONLY -> {
- final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
- yield new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
- @Override
- DOMStoreWriteTransaction getWriteDelegate() {
- return writeOnly;
- }
-
- @Override
- DOMStoreReadTransaction getReadDelegate() {
- throw new UnsupportedOperationException();
- }
- };
- }
- default -> throw new IllegalArgumentException("Invalid transaction type: " + parent.getType());
- };
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import java.util.Optional;
-import java.util.SortedSet;
-import java.util.concurrent.TimeUnit;
-import org.eclipse.jdt.annotation.NonNull;
-import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import scala.concurrent.Future;
-
-/**
- * A helper class that wraps an eventual TransactionContext instance. We have two specializations:
- * <ul>
- * <li>{@link DelayedTransactionContextWrapper}, which enqueues operations towards the backend</li>
- * <li>{@link DirectTransactionContextWrapper}, which sends operations to the backend</li>
- * </ul>
- */
-abstract class AbstractTransactionContextWrapper {
- private final TransactionIdentifier identifier;
- private final OperationLimiter limiter;
- private final String shardName;
-
- AbstractTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
- @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
- this.identifier = requireNonNull(identifier);
- this.shardName = requireNonNull(shardName);
- limiter = new OperationLimiter(identifier,
- // 1 extra permit for the ready operation
- actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1,
- TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis()));
- }
-
- final TransactionIdentifier getIdentifier() {
- return identifier;
- }
-
- final OperationLimiter getLimiter() {
- return limiter;
- }
-
- final String getShardName() {
- return shardName;
- }
-
- abstract @Nullable TransactionContext getTransactionContext();
-
- /**
- * Either enqueue or execute specified operation.
- *
- * @param op Operation to (eventually) execute
- */
- abstract void maybeExecuteTransactionOperation(TransactionOperation op);
-
- /**
- * Mark the transaction as ready.
- *
- * @param participatingShardNames Shards which participate on the transaction
- * @return Future indicating the transaction has been readied on the backend
- */
- abstract @NonNull Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames);
-}
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
private boolean writeOnlyTransactionOptimizationsEnabled = true;
private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
- @Deprecated(since = "7.0.0", forRemoval = true)
- private boolean useTellBasedProtocol = true;
private boolean transactionDebugContextEnabled = false;
private String shardManagerPersistenceId;
private int maximumMessageSliceSize = DEFAULT_MAX_MESSAGE_SLICE_SIZE;
shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
transactionDebugContextEnabled = other.transactionDebugContextEnabled;
shardManagerPersistenceId = other.shardManagerPersistenceId;
- useTellBasedProtocol = other.useTellBasedProtocol;
backendAlivenessTimerInterval = other.backendAlivenessTimerInterval;
requestTimeout = other.requestTimeout;
noProgressTimeout = other.noProgressTimeout;
return transactionDebugContextEnabled;
}
- @Deprecated(since = "7.0.0", forRemoval = true)
- public boolean isUseTellBasedProtocol() {
- return useTellBasedProtocol;
- }
-
public boolean isUseLz4Compression() {
return useLz4Compression;
}
return this;
}
- @Deprecated(since = "7.0.0", forRemoval = true)
- public Builder useTellBasedProtocol(final boolean value) {
- datastoreContext.useTellBasedProtocol = value;
- return this;
- }
-
public Builder useLz4Compression(final boolean value) {
datastoreContext.useLz4Compression = value;
return this;
import org.apache.commons.text.WordUtils;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStorePropertiesContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStorePropertiesContainer;
import org.opendaylight.yangtools.yang.common.Uint16;
import org.opendaylight.yangtools.yang.common.Uint32;
import org.opendaylight.yangtools.yang.common.Uint64;
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.List;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * An AbstractThreePhaseCommitCohort implementation used for debugging. If a failure occurs, the transaction
- * call site is printed.
- *
- * @author Thomas Pantelis
- */
-class DebugThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort<Object> {
- private static final Logger LOG = LoggerFactory.getLogger(DebugThreePhaseCommitCohort.class);
-
- private final AbstractThreePhaseCommitCohort<?> delegate;
- private final Throwable debugContext;
- private final TransactionIdentifier transactionId;
-
- @SuppressFBWarnings("SLF4J_LOGGER_SHOULD_BE_FINAL")
- private Logger log = LOG;
-
- DebugThreePhaseCommitCohort(final TransactionIdentifier transactionId,
- final AbstractThreePhaseCommitCohort<?> delegate, final Throwable debugContext) {
- this.delegate = requireNonNull(delegate);
- this.debugContext = requireNonNull(debugContext);
- this.transactionId = requireNonNull(transactionId);
- }
-
- private <V> ListenableFuture<V> addFutureCallback(final ListenableFuture<V> future) {
- Futures.addCallback(future, new FutureCallback<V>() {
- @Override
- public void onSuccess(final V result) {
- // no-op
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- log.warn("Transaction {} failed with error \"{}\" - was allocated in the following context",
- transactionId, failure, debugContext);
- }
- }, MoreExecutors.directExecutor());
-
- return future;
- }
-
- @Override
- public ListenableFuture<Boolean> canCommit() {
- return addFutureCallback(delegate.canCommit());
- }
-
- @Override
- public ListenableFuture<Empty> preCommit() {
- return addFutureCallback(delegate.preCommit());
- }
-
- @Override
- public ListenableFuture<? extends CommitInfo> commit() {
- return addFutureCallback(delegate.commit());
- }
-
- @Override
- public ListenableFuture<Empty> abort() {
- return delegate.abort();
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- List<Future<Object>> getCohortFutures() {
- return ((AbstractThreePhaseCommitCohort)delegate).getCohortFutures();
- }
-
- @VisibleForTesting
- void setLogger(final Logger logger) {
- log = logger;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-/**
- * Delayed implementation of TransactionContextWrapper. Operations destined for the target
- * TransactionContext instance are cached until the TransactionContext instance becomes
- * available at which time they are executed.
- *
- * @author Thomas Pantelis
- */
-final class DelayedTransactionContextWrapper extends AbstractTransactionContextWrapper {
- private static final Logger LOG = LoggerFactory.getLogger(DelayedTransactionContextWrapper.class);
-
- /**
- * The list of transaction operations to execute once the TransactionContext becomes available.
- */
- @GuardedBy("queuedTxOperations")
- private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
-
- /**
- * The resulting TransactionContext.
- */
- private volatile TransactionContext transactionContext;
- @GuardedBy("queuedTxOperations")
- private TransactionContext deferredTransactionContext;
- @GuardedBy("queuedTxOperations")
- private boolean pendingEnqueue;
-
- DelayedTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
- @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
- super(identifier, actorUtils, shardName);
- }
-
- @Override
- TransactionContext getTransactionContext() {
- return transactionContext;
- }
-
- @Override
- void maybeExecuteTransactionOperation(final TransactionOperation op) {
- final TransactionContext localContext = transactionContext;
- if (localContext != null) {
- op.invoke(localContext, null);
- } else {
- // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
- // callback to be executed after the Tx is created.
- enqueueTransactionOperation(op);
- }
- }
-
- @Override
- Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
- // avoid the creation of a promise and a TransactionOperation
- final TransactionContext localContext = transactionContext;
- if (localContext != null) {
- return localContext.readyTransaction(null, participatingShardNames);
- }
-
- final Promise<ActorSelection> promise = Futures.promise();
- enqueueTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
- promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
- }
- });
-
- return promise.future();
- }
-
- /**
- * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
- * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
- * context is not available.
- */
- private void enqueueTransactionOperation(final TransactionOperation operation) {
- // We have three things to do here:
- // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
- // - acquire a permit for the operation if we still need to enqueue it
- // - enqueue the operation
- //
- // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
- // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
- // complications are:
- // - this method may be called from the thread invoking executePriorTransactionOperations()
- // - user may be violating API contract of using the transaction from a single thread
-
- // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
- // the lock, we will assert that we will be enqueing another operation.
- final TransactionContext contextOnEntry;
- synchronized (queuedTxOperations) {
- contextOnEntry = transactionContext;
- if (contextOnEntry == null) {
- checkState(!pendingEnqueue, "Concurrent access to transaction %s detected", getIdentifier());
- pendingEnqueue = true;
- }
- }
-
- // Short-circuit if there is a context
- if (contextOnEntry != null) {
- operation.invoke(transactionContext, null);
- return;
- }
-
- boolean cleanupEnqueue = true;
- TransactionContext finishHandoff = null;
- try {
- // Acquire the permit,
- final boolean havePermit = getLimiter().acquire();
- if (!havePermit) {
- LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", getIdentifier(),
- getShardName());
- }
-
- // Ready to enqueue, take the lock again and append the operation
- synchronized (queuedTxOperations) {
- LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
- queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
- pendingEnqueue = false;
- cleanupEnqueue = false;
- finishHandoff = deferredTransactionContext;
- deferredTransactionContext = null;
- }
- } finally {
- if (cleanupEnqueue) {
- synchronized (queuedTxOperations) {
- pendingEnqueue = false;
- finishHandoff = deferredTransactionContext;
- deferredTransactionContext = null;
- }
- }
- if (finishHandoff != null) {
- executePriorTransactionOperations(finishHandoff);
- }
- }
- }
-
- void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
- while (true) {
- // Access to queuedTxOperations and transactionContext must be protected and atomic
- // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
- // issues and ensure no TransactionOperation is missed and that they are processed
- // in the order they occurred.
-
- // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
- // in case a TransactionOperation results in another transaction operation being
- // queued (eg a put operation from a client read Future callback that is notified
- // synchronously).
- final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
- synchronized (queuedTxOperations) {
- if (queuedTxOperations.isEmpty()) {
- if (!pendingEnqueue) {
- // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
- localTransactionContext.operationHandOffComplete();
-
- // This is null-to-non-null transition after which we are releasing the lock and not doing
- // any further processing.
- transactionContext = localTransactionContext;
- } else {
- deferredTransactionContext = localTransactionContext;
- }
- return;
- }
-
- operationsBatch = new ArrayList<>(queuedTxOperations);
- queuedTxOperations.clear();
- }
-
- // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is
- // that we need to re-acquire the lock below but this should be negligible.
- for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
- final Boolean permit = oper.getValue();
- if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
- // If the context is not using limiting we need to release operations as we are queueing them, so
- // user threads are not charged for them.
- getLimiter().release();
- }
- oper.getKey().invoke(localTransactionContext, permit);
- }
- }
- }
-
-}
+++ /dev/null
-/*
- * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import scala.concurrent.Future;
-
-/**
- * Direct implementation of TransactionContextWrapper. Operation are executed directly on TransactionContext. Always
- * has completed context and executes on local shard.
- */
-final class DirectTransactionContextWrapper extends AbstractTransactionContextWrapper {
- private final TransactionContext transactionContext;
-
- DirectTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
- @NonNull final ActorUtils actorUtils,
- @NonNull final String shardName,
- @NonNull final TransactionContext transactionContext) {
- super(identifier, actorUtils, shardName);
- this.transactionContext = requireNonNull(transactionContext);
- }
-
- @Override
- TransactionContext getTransactionContext() {
- return transactionContext;
- }
-
- @Override
- void maybeExecuteTransactionOperation(final TransactionOperation op) {
- op.invoke(transactionContext, null);
- }
-
- @Override
- Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
- return transactionContext.readyTransaction(null, participatingShardNames);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSystem;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-
-/**
- * Implements a distributed DOMStore using Akka {@code Patterns.ask()}.
- *
- * @deprecated This implementation is destined for removal,
- */
-@Deprecated(since = "7.0.0", forRemoval = true)
-public class DistributedDataStore extends AbstractDataStore {
- private final TransactionContextFactory txContextFactory;
-
- public DistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
- final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
- final DatastoreSnapshot restoreFromSnapshot) {
- super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot);
- txContextFactory = new TransactionContextFactory(getActorUtils(), getIdentifier());
- }
-
- @Override
- public DOMStoreTransactionChain createTransactionChain() {
- return txContextFactory.createTransactionChain();
- }
-
- @Override
- public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new TransactionProxy(txContextFactory, TransactionType.READ_ONLY);
- }
-
- @Override
- public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- getActorUtils().acquireTxCreationPermit();
- return new TransactionProxy(txContextFactory, TransactionType.WRITE_ONLY);
- }
-
- @Override
- public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- getActorUtils().acquireTxCreationPermit();
- return new TransactionProxy(txContextFactory, TransactionType.READ_WRITE);
- }
-
- @Override
- public void close() {
- txContextFactory.close();
- super.close();
- }
-}
*/
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorSystem;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
-import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.mdsal.dom.api.DOMSchemaService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final String datastoreName = initialDatastoreContext.getDataStoreName();
LOG.info("Create data store instance of type : {}", datastoreName);
- final ActorSystem actorSystem = actorSystemProvider.getActorSystem();
- final DatastoreSnapshot restoreFromSnapshot = datastoreSnapshotRestore.getAndRemove(datastoreName).orElse(null);
+ final var actorSystem = actorSystemProvider.getActorSystem();
+ final var restoreFromSnapshot = datastoreSnapshotRestore.getAndRemove(datastoreName).orElse(null);
final Configuration config;
if (orgConfig == null) {
} else {
config = orgConfig;
}
- final ClusterWrapper clusterWrapper = new ClusterWrapperImpl(actorSystem);
- final DatastoreContextFactory contextFactory = introspector.newContextFactory();
+ final var clusterWrapper = new ClusterWrapperImpl(actorSystem);
+ final var contextFactory = introspector.newContextFactory();
- // This is the potentially-updated datastore context, distinct from the initial one
- final DatastoreContext datastoreContext = contextFactory.getBaseDatastoreContext();
-
- final AbstractDataStore dataStore;
- if (datastoreContext.isUseTellBasedProtocol()) {
- dataStore = new ClientBackedDataStore(actorSystem, clusterWrapper, config, contextFactory,
- restoreFromSnapshot);
- LOG.info("Data store {} is using tell-based protocol", datastoreName);
- } else {
- dataStore = new DistributedDataStore(actorSystem, clusterWrapper, config, contextFactory,
- restoreFromSnapshot);
- LOG.warn("Data store {} is using ask-based protocol, which will be removed in the next major release",
- datastoreName);
- }
-
- return dataStore;
+ final var ret = new ClientBackedDataStore(actorSystem, clusterWrapper, config, contextFactory,
+ restoreFromSnapshot);
+ LOG.info("Data store {} is using tell-based protocol", datastoreName);
+ return ret;
}
}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
-import akka.dispatch.OnComplete;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.mdsal.dom.spi.store.SnapshotBackedWriteTransaction;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * Fake {@link DOMStoreThreePhaseCommitCohort} instantiated for local transactions to conform with the DOM
- * transaction APIs. It is only used to hold the data from a local DOM transaction ready operation and to
- * initiate direct or coordinated commits from the front-end by sending the ReadyLocalTransaction message.
- * It is not actually called by the front-end to perform 3PC thus the canCommit/preCommit/commit methods
- * are no-ops.
- */
-class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
- private static final Logger LOG = LoggerFactory.getLogger(LocalThreePhaseCommitCohort.class);
-
- private final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction;
- private final DataTreeModification modification;
- private final ActorUtils actorUtils;
- private final ActorSelection leader;
- private final Exception operationError;
-
- protected LocalThreePhaseCommitCohort(final ActorUtils actorUtils, final ActorSelection leader,
- final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
- final DataTreeModification modification,
- final Exception operationError) {
- this.actorUtils = requireNonNull(actorUtils);
- this.leader = requireNonNull(leader);
- this.transaction = requireNonNull(transaction);
- this.modification = requireNonNull(modification);
- this.operationError = operationError;
- }
-
- protected LocalThreePhaseCommitCohort(final ActorUtils actorUtils, final ActorSelection leader,
- final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final Exception operationError) {
- this.actorUtils = requireNonNull(actorUtils);
- this.leader = requireNonNull(leader);
- this.transaction = requireNonNull(transaction);
- this.operationError = requireNonNull(operationError);
- modification = null;
- }
-
- private Future<Object> initiateCommit(final boolean immediate,
- final Optional<SortedSet<String>> participatingShardNames) {
- if (operationError != null) {
- return Futures.failed(operationError);
- }
-
- final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier(),
- modification, immediate, participatingShardNames);
- return actorUtils.executeOperationAsync(leader, message, actorUtils.getTransactionCommitOperationTimeout());
- }
-
- Future<ActorSelection> initiateCoordinatedCommit(final Optional<SortedSet<String>> participatingShardNames) {
- final Future<Object> messageFuture = initiateCommit(false, participatingShardNames);
- final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorUtils,
- transaction.getIdentifier());
- ret.onComplete(new OnComplete<ActorSelection>() {
- @Override
- public void onComplete(final Throwable failure, final ActorSelection success) {
- if (failure != null) {
- LOG.warn("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure);
- transactionAborted(transaction);
- return;
- }
-
- LOG.debug("Transaction {} resolved to actor {}", transaction.getIdentifier(), success);
- }
- }, actorUtils.getClientDispatcher());
-
- return ret;
- }
-
- Future<Object> initiateDirectCommit() {
- final Future<Object> messageFuture = initiateCommit(true, Optional.empty());
- messageFuture.onComplete(new OnComplete<>() {
- @Override
- public void onComplete(final Throwable failure, final Object message) {
- if (failure != null) {
- LOG.warn("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure);
- transactionAborted(transaction);
- } else if (CommitTransactionReply.isSerializedType(message)) {
- LOG.debug("Transaction {} committed successfully", transaction.getIdentifier());
- transactionCommitted(transaction);
- } else {
- LOG.error("Transaction {} resulted in unhandled message type {}, aborting",
- transaction.getIdentifier(), message.getClass());
- transactionAborted(transaction);
- }
- }
- }, actorUtils.getClientDispatcher());
-
- return messageFuture;
- }
-
- @Override
- public final ListenableFuture<Boolean> canCommit() {
- // Intended no-op
- throw new UnsupportedOperationException();
- }
-
- @Override
- public final ListenableFuture<Empty> preCommit() {
- // Intended no-op
- throw new UnsupportedOperationException();
- }
-
- @Override
- public final ListenableFuture<Empty> abort() {
- // Intended no-op
- throw new UnsupportedOperationException();
- }
-
- @Override
- public final ListenableFuture<CommitInfo> commit() {
- // Intended no-op
- throw new UnsupportedOperationException();
- }
-
- protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> aborted) {
- }
-
- protected void transactionCommitted(final SnapshotBackedWriteTransaction<TransactionIdentifier> comitted) {
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.mdsal.dom.spi.store.AbstractSnapshotBackedTransactionChain;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.SnapshotBackedWriteTransaction;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
-import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
-
-/**
- * Transaction chain instantiated on top of a locally-available DataTree. It does not instantiate
- * a transaction in the leader and rather chains transactions on top of themselves.
- */
-final class LocalTransactionChain extends AbstractSnapshotBackedTransactionChain<TransactionIdentifier>
- implements LocalTransactionFactory {
- private static final Throwable ABORTED = new Throwable("Transaction aborted");
- private final TransactionChainProxy parent;
- private final ActorSelection leader;
- private final ReadOnlyDataTree tree;
-
- LocalTransactionChain(final TransactionChainProxy parent, final ActorSelection leader,
- final ReadOnlyDataTree tree) {
- this.parent = requireNonNull(parent);
- this.leader = requireNonNull(leader);
- this.tree = requireNonNull(tree);
- }
-
- ReadOnlyDataTree getDataTree() {
- return tree;
- }
-
- @Override
- protected TransactionIdentifier nextTransactionIdentifier() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- protected boolean getDebugTransactions() {
- return false;
- }
-
- @Override
- protected DataTreeSnapshot takeSnapshot() {
- return tree.takeSnapshot();
- }
-
- @Override
- protected DOMStoreThreePhaseCommitCohort createCohort(
- final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
- final DataTreeModification modification,
- final Exception operationError) {
- return new LocalChainThreePhaseCommitCohort(transaction, modification, operationError);
- }
-
- @Override
- public DOMStoreReadTransaction newReadOnlyTransaction(final TransactionIdentifier identifier) {
- return super.newReadOnlyTransaction(identifier);
- }
-
- @Override
- public DOMStoreReadWriteTransaction newReadWriteTransaction(final TransactionIdentifier identifier) {
- return super.newReadWriteTransaction(identifier);
- }
-
- @Override
- public DOMStoreWriteTransaction newWriteOnlyTransaction(final TransactionIdentifier identifier) {
- return super.newWriteOnlyTransaction(identifier);
- }
-
- @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch"})
- @Override
- public LocalThreePhaseCommitCohort onTransactionReady(final DOMStoreWriteTransaction tx,
- final Exception operationError) {
- checkArgument(tx instanceof SnapshotBackedWriteTransaction);
- if (operationError != null) {
- return new LocalChainThreePhaseCommitCohort((SnapshotBackedWriteTransaction<TransactionIdentifier>)tx,
- operationError);
- }
-
- try {
- return (LocalThreePhaseCommitCohort) tx.ready();
- } catch (Exception e) {
- // Unfortunately we need to cast to SnapshotBackedWriteTransaction here as it's required by
- // LocalThreePhaseCommitCohort and the base class.
- return new LocalChainThreePhaseCommitCohort((SnapshotBackedWriteTransaction<TransactionIdentifier>)tx, e);
- }
- }
-
- private class LocalChainThreePhaseCommitCohort extends LocalThreePhaseCommitCohort {
-
- protected LocalChainThreePhaseCommitCohort(
- final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
- final DataTreeModification modification, final Exception operationError) {
- super(parent.getActorUtils(), leader, transaction, modification, operationError);
- }
-
- protected LocalChainThreePhaseCommitCohort(
- final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction,
- final Exception operationError) {
- super(parent.getActorUtils(), leader, transaction, operationError);
- }
-
- @Override
- protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
- onTransactionFailed(transaction, ABORTED);
- }
-
- @Override
- protected void transactionCommitted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
- onTransactionCommited(transaction);
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Optional;
-import java.util.SortedSet;
-import java.util.function.Consumer;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import scala.concurrent.Future;
-
-/**
- * Processes front-end transaction operations locally before being committed to the destination shard.
- * Instances of this class are used when the destination shard is local to the caller.
- *
- * @author Thomas Pantelis
- */
-abstract class LocalTransactionContext extends TransactionContext {
- private final DOMStoreTransaction txDelegate;
- private final LocalTransactionReadySupport readySupport;
- private Exception operationError;
-
- LocalTransactionContext(final DOMStoreTransaction txDelegate, final TransactionIdentifier identifier,
- final LocalTransactionReadySupport readySupport) {
- super(identifier);
- this.txDelegate = requireNonNull(txDelegate);
- this.readySupport = readySupport;
- }
-
- abstract DOMStoreWriteTransaction getWriteDelegate();
-
- abstract DOMStoreReadTransaction getReadDelegate();
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void executeModification(final Consumer<DOMStoreWriteTransaction> consumer) {
- incrementModificationCount();
- if (operationError == null) {
- try {
- consumer.accept(getWriteDelegate());
- } catch (Exception e) {
- operationError = e;
- }
- }
- }
-
- @Override
- void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
- executeModification(transaction -> transaction.delete(path));
- }
-
- @Override
- void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
- executeModification(transaction -> transaction.merge(path, data));
- }
-
- @Override
- void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
- executeModification(transaction -> transaction.write(path, data));
- }
-
- @Override
- <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
- final Boolean havePermit) {
- Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
- @Override
- public void onSuccess(final T result) {
- proxyFuture.set(result);
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- proxyFuture.setException(failure instanceof Exception
- ? ReadFailedException.MAPPER.apply((Exception) failure) : failure);
- }
- }, MoreExecutors.directExecutor());
- }
-
- @Override
- Future<ActorSelection> readyTransaction(final Boolean havePermit,
- final Optional<SortedSet<String>> participatingShardNames) {
- final LocalThreePhaseCommitCohort cohort = ready();
- return cohort.initiateCoordinatedCommit(participatingShardNames);
- }
-
- @Override
- Future<Object> directCommit(final Boolean havePermit) {
- final LocalThreePhaseCommitCohort cohort = ready();
- return cohort.initiateDirectCommit();
- }
-
- @Override
- void closeTransaction() {
- txDelegate.close();
- }
-
- private LocalThreePhaseCommitCohort ready() {
- logModificationCount();
- return readySupport.onTransactionReady(getWriteDelegate(), operationError);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-
-/**
- * A factory for creating local transactions used by {@link AbstractTransactionContextFactory} to instantiate
- * transactions on shards which are co-located with the shard leader.
- *
- * @author Thomas Pantelis
- */
-interface LocalTransactionFactory extends LocalTransactionReadySupport {
- DOMStoreReadTransaction newReadOnlyTransaction(TransactionIdentifier identifier);
-
- DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier);
-
- DOMStoreWriteTransaction newWriteOnlyTransaction(TransactionIdentifier identifier);
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.SnapshotBackedTransactions;
-import org.opendaylight.mdsal.dom.spi.store.SnapshotBackedWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
-
-/**
- * {@link LocalTransactionFactory} for instantiating backing transactions which are
- * disconnected from each other, ie not chained. These are used by {@link AbstractTransactionContextFactory}
- * to instantiate transactions on shards which are co-located with the shard leader.
- */
-final class LocalTransactionFactoryImpl extends TransactionReadyPrototype<TransactionIdentifier>
- implements LocalTransactionFactory {
-
- private final ActorSelection leader;
- private final ReadOnlyDataTree dataTree;
- private final ActorUtils actorUtils;
-
- LocalTransactionFactoryImpl(final ActorUtils actorUtils, final ActorSelection leader,
- final ReadOnlyDataTree dataTree) {
- this.leader = requireNonNull(leader);
- this.dataTree = requireNonNull(dataTree);
- this.actorUtils = actorUtils;
- }
-
- ReadOnlyDataTree getDataTree() {
- return dataTree;
- }
-
- @Override
- public DOMStoreReadTransaction newReadOnlyTransaction(final TransactionIdentifier identifier) {
- return SnapshotBackedTransactions.newReadTransaction(identifier, false, dataTree.takeSnapshot());
- }
-
- @Override
- public DOMStoreReadWriteTransaction newReadWriteTransaction(final TransactionIdentifier identifier) {
- return SnapshotBackedTransactions.newReadWriteTransaction(identifier, false, dataTree.takeSnapshot(), this);
- }
-
- @Override
- public DOMStoreWriteTransaction newWriteOnlyTransaction(final TransactionIdentifier identifier) {
- return SnapshotBackedTransactions.newWriteTransaction(identifier, false, dataTree.takeSnapshot(), this);
- }
-
- @Override
- protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> tx) {
- // No-op
- }
-
- @Override
- protected DOMStoreThreePhaseCommitCohort transactionReady(
- final SnapshotBackedWriteTransaction<TransactionIdentifier> tx,
- final DataTreeModification tree,
- final Exception readyError) {
- return new LocalThreePhaseCommitCohort(actorUtils, leader, tx, tree, readyError);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public LocalThreePhaseCommitCohort onTransactionReady(final DOMStoreWriteTransaction tx,
- final Exception operationError) {
- checkArgument(tx instanceof SnapshotBackedWriteTransaction);
- if (operationError != null) {
- return new LocalThreePhaseCommitCohort(actorUtils, leader,
- (SnapshotBackedWriteTransaction<TransactionIdentifier>)tx, operationError);
- }
-
- return (LocalThreePhaseCommitCohort) tx.ready();
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.eclipse.jdt.annotation.NonNull;
-import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-
-/**
- * Interface for a class that can "ready" a transaction.
- *
- * @author Thomas Pantelis
- */
-interface LocalTransactionReadySupport {
- LocalThreePhaseCommitCohort onTransactionReady(@NonNull DOMStoreWriteTransaction tx,
- @Nullable Exception operationError);
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Collections;
-import java.util.List;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.yangtools.yang.common.Empty;
-import scala.concurrent.Future;
-
-/**
- * A {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}
- * instance given out for empty transactions.
- */
-final class NoOpDOMStoreThreePhaseCommitCohort extends AbstractThreePhaseCommitCohort<Object> {
- static final NoOpDOMStoreThreePhaseCommitCohort INSTANCE = new NoOpDOMStoreThreePhaseCommitCohort();
-
- private NoOpDOMStoreThreePhaseCommitCohort() {
- // Hidden to prevent instantiation
- }
-
- @Override
- public ListenableFuture<Boolean> canCommit() {
- return IMMEDIATE_BOOLEAN_SUCCESS;
- }
-
- @Override
- public ListenableFuture<Empty> preCommit() {
- return IMMEDIATE_EMPTY_SUCCESS;
- }
-
- @Override
- public ListenableFuture<Empty> abort() {
- return IMMEDIATE_EMPTY_SUCCESS;
- }
-
- @Override
- public ListenableFuture<CommitInfo> commit() {
- return CommitInfo.emptyFluentFuture();
- }
-
- @Override
- List<Future<Object>> getCohortFutures() {
- return Collections.emptyList();
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSelection;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.mdsal.common.api.DataStoreUnavailableException;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-final class NoOpTransactionContext extends TransactionContext {
- private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
-
- private final Throwable failure;
-
- NoOpTransactionContext(final Throwable failure, final TransactionIdentifier identifier) {
- super(identifier);
- this.failure = failure;
- }
-
- @Override
- void closeTransaction() {
- LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier());
- }
-
- @Override
- Future<Object> directCommit(final Boolean havePermit) {
- LOG.debug("Tx {} directCommit called, failure", getIdentifier(), failure);
- return akka.dispatch.Futures.failed(failure);
- }
-
- @Override
- Future<ActorSelection> readyTransaction(final Boolean havePermit,
- final Optional<SortedSet<String>> participatingShardNamess) {
- LOG.debug("Tx {} readyTransaction called, failure", getIdentifier(), failure);
- return akka.dispatch.Futures.failed(failure);
- }
-
- @Override
- <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture, final Boolean havePermit) {
- LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
- readCmd.getPath());
-
- final Throwable t;
- if (failure instanceof NoShardLeaderException) {
- t = new DataStoreUnavailableException(failure.getMessage(), failure);
- } else {
- t = failure;
- }
- proxyFuture.setException(new ReadFailedException("Error executeRead " + readCmd.getClass().getSimpleName()
- + " for path " + readCmd.getPath(), t));
- }
-
- @Override
- void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
- LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
- }
-
- @Override
- void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
- LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
- }
-
- @Override
- void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
- LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class for limiting operations.
- */
-public class OperationLimiter {
- private static final Logger LOG = LoggerFactory.getLogger(OperationLimiter.class);
- private final TransactionIdentifier identifier;
- private final long acquireTimeout;
- private final Semaphore semaphore;
- private final int maxPermits;
-
- OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final long acquireTimeoutSeconds) {
- this.identifier = requireNonNull(identifier);
-
- checkArgument(acquireTimeoutSeconds >= 0);
- this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds);
-
- checkArgument(maxPermits >= 0);
- this.maxPermits = maxPermits;
- this.semaphore = new Semaphore(maxPermits);
- }
-
- boolean acquire() {
- return acquire(1);
- }
-
- boolean acquire(final int acquirePermits) {
- try {
- if (semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) {
- return true;
- }
- } catch (InterruptedException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", identifier, e);
- } else {
- LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", identifier);
- }
- }
-
- return false;
- }
-
- void release() {
- release(1);
- }
-
- void release(final int permits) {
- this.semaphore.release(permits);
- }
-
- @VisibleForTesting
- TransactionIdentifier getIdentifier() {
- return identifier;
- }
-
- @VisibleForTesting
- int availablePermits() {
- return semaphore.availablePermits();
- }
-
- /**
- * Release all the permits.
- */
- public void releaseAll() {
- this.semaphore.release(maxPermits - availablePermits());
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
-import akka.dispatch.OnComplete;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * Redirects front-end transaction operations to a shard for processing. Instances of this class are used
- * when the destination shard is remote to the caller.
- *
- * @author Thomas Pantelis
- */
-final class RemoteTransactionContext extends TransactionContext {
- private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class);
-
- private final ActorUtils actorUtils;
- private final ActorSelection actor;
- private final OperationLimiter limiter;
-
- private BatchedModifications batchedModifications;
- private int totalBatchedModificationsSent;
- private int batchPermits;
-
- /**
- * We have observed a failed modification batch. This transaction context is effectively doomed, as the backend
- * does not have a correct view of the world. If this happens, we do not limit operations but rather short-cut them
- * to a either a no-op (modifications) or a failure (reads). Once the transaction is ready, though, we send the
- * message to resynchronize with the backend, sharing a 'lost message' failure path.
- */
- private volatile Throwable failedModification;
-
- RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor,
- final ActorUtils actorUtils, final short remoteTransactionVersion, final OperationLimiter limiter) {
- super(identifier, remoteTransactionVersion);
- this.limiter = requireNonNull(limiter);
- this.actor = actor;
- this.actorUtils = actorUtils;
- }
-
- private ActorSelection getActor() {
- return actor;
- }
-
- protected ActorUtils getActorUtils() {
- return actorUtils;
- }
-
- @Override
- void closeTransaction() {
- LOG.debug("Tx {} closeTransaction called", getIdentifier());
- TransactionContextCleanup.untrack(this);
-
- actorUtils.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable());
- }
-
- @Override
- Future<Object> directCommit(final Boolean havePermit) {
- LOG.debug("Tx {} directCommit called", getIdentifier());
-
- // Send the remaining batched modifications, if any, with the ready flag set.
- bumpPermits(havePermit);
- return sendBatchedModifications(true, true, Optional.empty());
- }
-
- @Override
- Future<ActorSelection> readyTransaction(final Boolean havePermit,
- final Optional<SortedSet<String>> participatingShardNames) {
- logModificationCount();
-
- LOG.debug("Tx {} readyTransaction called", getIdentifier());
-
- // Send the remaining batched modifications, if any, with the ready flag set.
-
- bumpPermits(havePermit);
- Future<Object> lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames);
-
- // Transform the last reply Future into a Future that returns the cohort actor path from
- // the last reply message. That's the end result of the ready operation.
- return TransactionReadyReplyMapper.transform(lastModificationsFuture, actorUtils, getIdentifier());
- }
-
- private void bumpPermits(final Boolean havePermit) {
- if (Boolean.TRUE.equals(havePermit)) {
- ++batchPermits;
- }
- }
-
- private BatchedModifications newBatchedModifications() {
- return new BatchedModifications(getIdentifier(), getTransactionVersion());
- }
-
- private void batchModification(final Modification modification, final boolean havePermit) {
- incrementModificationCount();
- if (havePermit) {
- ++batchPermits;
- }
-
- if (batchedModifications == null) {
- batchedModifications = newBatchedModifications();
- }
-
- batchedModifications.addModification(modification);
-
- if (batchedModifications.getModifications().size()
- >= actorUtils.getDatastoreContext().getShardBatchedModificationCount()) {
- sendBatchedModifications();
- }
- }
-
- @VisibleForTesting
- Future<Object> sendBatchedModifications() {
- return sendBatchedModifications(false, false, Optional.empty());
- }
-
- private Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady,
- final Optional<SortedSet<String>> participatingShardNames) {
- Future<Object> sent = null;
- if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
- if (batchedModifications == null) {
- batchedModifications = newBatchedModifications();
- }
-
- LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
- batchedModifications.getModifications().size(), ready);
-
- batchedModifications.setDoCommitOnReady(doCommitOnReady);
- batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
-
- final BatchedModifications toSend = batchedModifications;
- final int permitsToRelease = batchPermits;
- batchPermits = 0;
-
- if (ready) {
- batchedModifications.setReady(participatingShardNames);
- batchedModifications.setDoCommitOnReady(doCommitOnReady);
- batchedModifications = null;
- } else {
- batchedModifications = newBatchedModifications();
-
- final Throwable failure = failedModification;
- if (failure != null) {
- // We have observed a modification failure, it does not make sense to send this batch. This speeds
- // up the time when the application could be blocked due to messages timing out and operation
- // limiter kicking in.
- LOG.debug("Tx {} modifications previously failed, not sending a non-ready batch", getIdentifier());
- limiter.release(permitsToRelease);
- return Futures.failed(failure);
- }
- }
-
- sent = actorUtils.executeOperationAsync(getActor(), toSend.toSerializable(),
- actorUtils.getTransactionCommitOperationTimeout());
- sent.onComplete(new OnComplete<>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) {
- if (failure != null) {
- LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
- failedModification = failure;
- } else {
- LOG.debug("Tx {} modifications completed with {}", getIdentifier(), success);
- }
- limiter.release(permitsToRelease);
- }
- }, actorUtils.getClientDispatcher());
- }
-
- return sent;
- }
-
- @Override
- void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
- LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
- executeModification(new DeleteModification(path), havePermit);
- }
-
- @Override
- void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
- LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
- executeModification(new MergeModification(path, data), havePermit);
- }
-
- @Override
- void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
- LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
- executeModification(new WriteModification(path, data), havePermit);
- }
-
- private void executeModification(final AbstractModification modification, final Boolean havePermit) {
- final boolean permitToRelease;
- if (havePermit == null) {
- permitToRelease = failedModification == null && acquireOperation();
- } else {
- permitToRelease = havePermit;
- }
-
- batchModification(modification, permitToRelease);
- }
-
- @Override
- <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture,
- final Boolean havePermit) {
- LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
- readCmd.getPath());
-
- final Throwable failure = failedModification;
- if (failure != null) {
- // If we know there was a previous modification failure, we must not send a read request, as it risks
- // returning incorrect data. We check this before acquiring an operation simply because we want the app
- // to complete this transaction as soon as possible.
- returnFuture.setException(new ReadFailedException("Previous modification failed, cannot "
- + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
- return;
- }
-
- // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
- // public API contract.
-
- final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit;
- sendBatchedModifications();
-
- OnComplete<Object> onComplete = new OnComplete<>() {
- @Override
- public void onComplete(final Throwable failure, final Object response) {
- // We have previously acquired an operation, now release it, no matter what happened
- if (permitToRelease) {
- limiter.release();
- }
-
- if (failure != null) {
- LOG.debug("Tx {} {} operation failed", getIdentifier(), readCmd.getClass().getSimpleName(),
- failure);
-
- returnFuture.setException(new ReadFailedException("Error checking "
- + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
- } else {
- LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName());
- readCmd.processResponse(response, returnFuture);
- }
- }
- };
-
- final Future<Object> future = actorUtils.executeOperationAsync(getActor(),
- readCmd.asVersion(getTransactionVersion()).toSerializable(), actorUtils.getOperationTimeout());
- future.onComplete(onComplete, actorUtils.getClientDispatcher());
- }
-
- /**
- * Acquire operation from the limiter if the hand-off has completed. If the hand-off is still ongoing, this method
- * does nothing.
- *
- * @return True if a permit was successfully acquired, false otherwise
- */
- private boolean acquireOperation() {
- checkState(isOperationHandOffComplete(),
- "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff",
- getIdentifier(), actor);
-
- if (limiter.acquire()) {
- return true;
- }
-
- LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), actor);
- return false;
- }
-
- @Override
- boolean usesOperationLimiting() {
- return true;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import akka.dispatch.OnComplete;
-import akka.pattern.AskTimeoutException;
-import akka.util.Timeout;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Handles creation of TransactionContext instances for remote transactions. This class creates
- * remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit,
- * if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay.
- * <p/>
- * The end result from a completed CreateTransaction message is a TransactionContext that is
- * used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cached via a DelayedTransactionContextWrapper and executed once the
- * CreateTransaction completes, successfully or not.
- */
-final class RemoteTransactionContextSupport {
- private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class);
-
- private static final long CREATE_TX_TRY_INTERVAL_IN_MS = 1000;
- private static final long MAX_CREATE_TX_MSG_TIMEOUT_IN_MS = 5000;
-
- private final TransactionProxy parent;
- private final String shardName;
-
- /**
- * The target primary shard.
- */
- private volatile PrimaryShardInfo primaryShardInfo;
-
- /**
- * The total timeout for creating a tx on the primary shard.
- */
- private volatile long totalCreateTxTimeout;
-
- private final Timeout createTxMessageTimeout;
-
- private final DelayedTransactionContextWrapper transactionContextWrapper;
-
- RemoteTransactionContextSupport(final DelayedTransactionContextWrapper transactionContextWrapper,
- final TransactionProxy parent, final String shardName) {
- this.parent = requireNonNull(parent);
- this.shardName = shardName;
- this.transactionContextWrapper = transactionContextWrapper;
-
- // For the total create tx timeout, use 2 times the election timeout. This should be enough time for
- // a leader re-election to occur if we happen to hit it in transition.
- totalCreateTxTimeout = parent.getActorUtils().getDatastoreContext().getShardRaftConfig()
- .getElectionTimeOutInterval().toMillis() * 2;
-
- // We'll use the operationTimeout for the the create Tx message timeout so it can be set appropriately
- // for unit tests but cap it at MAX_CREATE_TX_MSG_TIMEOUT_IN_MS. The operationTimeout could be set
- // larger than the totalCreateTxTimeout in production which we don't want.
- long operationTimeout = parent.getActorUtils().getOperationTimeout().duration().toMillis();
- createTxMessageTimeout = new Timeout(Math.min(operationTimeout, MAX_CREATE_TX_MSG_TIMEOUT_IN_MS),
- TimeUnit.MILLISECONDS);
- }
-
- String getShardName() {
- return shardName;
- }
-
- private TransactionType getTransactionType() {
- return parent.getType();
- }
-
- private ActorUtils getActorUtils() {
- return parent.getActorUtils();
- }
-
- private TransactionIdentifier getIdentifier() {
- return parent.getIdentifier();
- }
-
- /**
- * Sets the target primary shard and initiates a CreateTransaction try.
- */
- void setPrimaryShard(final PrimaryShardInfo newPrimaryShardInfo) {
- primaryShardInfo = newPrimaryShardInfo;
-
- if (getTransactionType() == TransactionType.WRITE_ONLY
- && getActorUtils().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
- ActorSelection primaryShard = newPrimaryShardInfo.getPrimaryShardActor();
-
- LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
- getIdentifier(), primaryShard);
-
- // For write-only Tx's we prepare the transaction modifications directly on the shard actor
- // to avoid the overhead of creating a separate transaction actor.
- transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext(
- primaryShard, String.valueOf(primaryShard.path()), newPrimaryShardInfo.getPrimaryShardVersion()));
- } else {
- tryCreateTransaction();
- }
- }
-
- /**
- Performs a CreateTransaction try async.
- */
- private void tryCreateTransaction() {
- LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(),
- primaryShardInfo.getPrimaryShardActor());
-
- Object serializedCreateMessage = new CreateTransaction(getIdentifier(), getTransactionType().ordinal(),
- primaryShardInfo.getPrimaryShardVersion()).toSerializable();
-
- Future<Object> createTxFuture = getActorUtils().executeOperationAsync(
- primaryShardInfo.getPrimaryShardActor(), serializedCreateMessage, createTxMessageTimeout);
-
- createTxFuture.onComplete(new OnComplete<>() {
- @Override
- public void onComplete(final Throwable failure, final Object response) {
- onCreateTransactionComplete(failure, response);
- }
- }, getActorUtils().getClientDispatcher());
- }
-
- private void tryFindPrimaryShard() {
- LOG.debug("Tx {} Retrying findPrimaryShardAsync for shard {}", getIdentifier(), shardName);
-
- primaryShardInfo = null;
- Future<PrimaryShardInfo> findPrimaryFuture = getActorUtils().findPrimaryShardAsync(shardName);
- findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
- @Override
- public void onComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) {
- onFindPrimaryShardComplete(failure, newPrimaryShardInfo);
- }
- }, getActorUtils().getClientDispatcher());
- }
-
- private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) {
- if (failure == null) {
- primaryShardInfo = newPrimaryShardInfo;
- tryCreateTransaction();
- } else {
- LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure);
-
- onCreateTransactionComplete(failure, null);
- }
- }
-
- private void onCreateTransactionComplete(final Throwable failure, final Object response) {
- // An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or
- // the cached remote leader actor is no longer available.
- boolean retryCreateTransaction = primaryShardInfo != null
- && (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
-
- // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may
- // be written by different threads however not concurrently, therefore decrementing it
- // non-atomically here is ok.
- if (retryCreateTransaction && totalCreateTxTimeout > 0) {
- long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS;
- if (failure instanceof AskTimeoutException) {
- // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed
- // out, subtract it from the total timeout. Also since the createTxMessageTimeout period
- // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate).
- totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis();
- scheduleInterval = 10;
- }
-
- totalCreateTxTimeout -= scheduleInterval;
-
- LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms",
- getIdentifier(), shardName, failure, scheduleInterval);
-
- getActorUtils().getActorSystem().scheduler().scheduleOnce(
- FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
- this::tryFindPrimaryShard, getActorUtils().getClientDispatcher());
- return;
- }
-
- createTransactionContext(failure, response);
- }
-
- private void createTransactionContext(final Throwable failure, final Object response) {
- // Create the TransactionContext from the response or failure. Store the new
- // TransactionContext locally until we've completed invoking the
- // TransactionOperations. This avoids thread timing issues which could cause
- // out-of-order TransactionOperations. Eg, on a modification operation, if the
- // TransactionContext is non-null, then we directly call the TransactionContext.
- // However, at the same time, the code may be executing the cached
- // TransactionOperations. So to avoid thus timing, we don't publish the
- // TransactionContext until after we've executed all cached TransactionOperations.
- TransactionContext localTransactionContext;
- if (failure != null) {
- LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
-
- Throwable resultingEx = failure;
- if (failure instanceof AskTimeoutException) {
- resultingEx = new ShardLeaderNotRespondingException(String.format(
- "Could not create a %s transaction on shard %s. The shard leader isn't responding.",
- parent.getType(), shardName), failure);
- } else if (!(failure instanceof NoShardLeaderException)) {
- resultingEx = new Exception(String.format(
- "Error creating %s transaction on shard %s", parent.getType(), shardName), failure);
- }
-
- localTransactionContext = new NoOpTransactionContext(resultingEx, getIdentifier());
- } else if (CreateTransactionReply.isSerializedType(response)) {
- localTransactionContext = createValidTransactionContext(
- CreateTransactionReply.fromSerializable(response));
- } else {
- IllegalArgumentException exception = new IllegalArgumentException(String.format(
- "Invalid reply type %s for CreateTransaction", response.getClass()));
-
- localTransactionContext = new NoOpTransactionContext(exception, getIdentifier());
- }
- transactionContextWrapper.executePriorTransactionOperations(localTransactionContext);
- }
-
- private TransactionContext createValidTransactionContext(final CreateTransactionReply reply) {
- LOG.debug("Tx {} Received {}", getIdentifier(), reply);
-
- return createValidTransactionContext(getActorUtils().actorSelection(reply.getTransactionPath()),
- reply.getTransactionPath(), primaryShardInfo.getPrimaryShardVersion());
- }
-
- private TransactionContext createValidTransactionContext(final ActorSelection transactionActor,
- final String transactionPath, final short remoteTransactionVersion) {
- final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(),
- transactionActor, getActorUtils(), remoteTransactionVersion, transactionContextWrapper.getLimiter());
-
- if (parent.getType() == TransactionType.READ_ONLY) {
- TransactionContextCleanup.track(parent, ret);
- }
-
- return ret;
- }
-}
-
import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.messages.RequestLeadership;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
import org.opendaylight.yangtools.yang.data.tree.api.DataValidationFailedException;
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.dispatch.OnComplete;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.List;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * A cohort proxy implementation for a single-shard transaction commit. If the transaction was a direct commit
- * to the shard, this implementation elides the CanCommitTransaction and CommitTransaction messages to the
- * shard as an optimization.
- *
- * @author Thomas Pantelis
- */
-class SingleCommitCohortProxy extends AbstractThreePhaseCommitCohort<Object> {
- private static final Logger LOG = LoggerFactory.getLogger(SingleCommitCohortProxy.class);
-
- private final ActorUtils actorUtils;
- private final Future<Object> cohortFuture;
- private final TransactionIdentifier transactionId;
- private volatile DOMStoreThreePhaseCommitCohort delegateCohort = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
- private final OperationCallback.Reference operationCallbackRef;
-
- SingleCommitCohortProxy(final ActorUtils actorUtils, final Future<Object> cohortFuture,
- final TransactionIdentifier transactionId, final OperationCallback.Reference operationCallbackRef) {
- this.actorUtils = actorUtils;
- this.cohortFuture = cohortFuture;
- this.transactionId = requireNonNull(transactionId);
- this.operationCallbackRef = operationCallbackRef;
- }
-
- @Override
- public ListenableFuture<Boolean> canCommit() {
- LOG.debug("Tx {} canCommit", transactionId);
-
- final SettableFuture<Boolean> returnFuture = SettableFuture.create();
-
- cohortFuture.onComplete(new OnComplete<>() {
- @Override
- public void onComplete(final Throwable failure, final Object cohortResponse) {
- if (failure != null) {
- operationCallbackRef.get().failure();
- returnFuture.setException(failure);
- return;
- }
-
- operationCallbackRef.get().success();
-
- LOG.debug("Tx {} successfully completed direct commit", transactionId);
-
- // The Future was the result of a direct commit to the shard, essentially eliding the
- // front-end 3PC coordination. We don't really care about the specific Future
- // response object, only that it completed successfully. At this point the Tx is complete
- // so return true. The subsequent preCommit and commit phases will be no-ops, ie return
- // immediate success, to complete the 3PC for the front-end.
- returnFuture.set(Boolean.TRUE);
- }
- }, actorUtils.getClientDispatcher());
-
- return returnFuture;
- }
-
- @Override
- public ListenableFuture<Empty> preCommit() {
- return delegateCohort.preCommit();
- }
-
- @Override
- public ListenableFuture<Empty> abort() {
- return delegateCohort.abort();
- }
-
- @Override
- public ListenableFuture<? extends CommitInfo> commit() {
- return delegateCohort.commit();
- }
-
- @Override
- List<Future<Object>> getCohortFutures() {
- return List.of(cohortFuture);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import akka.dispatch.Futures;
-import akka.dispatch.OnComplete;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-/**
- * A chain of {@link TransactionProxy}s. It allows a single open transaction to be open
- * at a time. For remote transactions, it also tracks the outstanding readiness requests
- * towards the shard and unblocks operations only after all have completed.
- */
-final class TransactionChainProxy extends AbstractTransactionContextFactory<LocalTransactionChain>
- implements DOMStoreTransactionChain {
- private abstract static class State {
- /**
- * Check if it is okay to allocate a new transaction.
- * @throws IllegalStateException if a transaction may not be allocated.
- */
- abstract void checkReady();
-
- /**
- * Return the future which needs to be waited for before shard information
- * is returned (which unblocks remote transactions).
- * @return Future to wait for, or null of no wait is necessary
- */
- abstract Future<?> previousFuture();
- }
-
- private abstract static class Pending extends State {
- private final TransactionIdentifier transaction;
- private final Future<?> previousFuture;
-
- Pending(final TransactionIdentifier transaction, final Future<?> previousFuture) {
- this.previousFuture = previousFuture;
- this.transaction = requireNonNull(transaction);
- }
-
- @Override
- final Future<?> previousFuture() {
- return previousFuture;
- }
-
- final TransactionIdentifier getIdentifier() {
- return transaction;
- }
- }
-
- private static final class Allocated extends Pending {
- Allocated(final TransactionIdentifier transaction, final Future<?> previousFuture) {
- super(transaction, previousFuture);
- }
-
- @Override
- void checkReady() {
- throw new IllegalStateException(String.format("Previous transaction %s is not ready yet", getIdentifier()));
- }
- }
-
- private static final class Submitted extends Pending {
- Submitted(final TransactionIdentifier transaction, final Future<?> previousFuture) {
- super(transaction, previousFuture);
- }
-
- @Override
- void checkReady() {
- // Okay to allocate
- }
- }
-
- private abstract static class DefaultState extends State {
- @Override
- final Future<?> previousFuture() {
- return null;
- }
- }
-
- private static final State IDLE_STATE = new DefaultState() {
- @Override
- void checkReady() {
- // Okay to allocate
- }
- };
-
- private static final State CLOSED_STATE = new DefaultState() {
- @Override
- void checkReady() {
- throw new DOMTransactionChainClosedException("Transaction chain has been closed");
- }
- };
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
- private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "currentState");
-
- private final TransactionContextFactory parent;
- private volatile State currentState = IDLE_STATE;
-
- /**
- * This map holds Promise instances for each read-only tx. It is used to maintain ordering of tx creates
- * wrt to read-only tx's between this class and a LocalTransactionChain since they're bridged by
- * asynchronous futures. Otherwise, in the following scenario, eg:
- * <p/>
- * 1) Create write tx1 on chain
- * 2) do write and submit
- * 3) Create read-only tx2 on chain and issue read
- * 4) Create write tx3 on chain, do write but do not submit
- * <p/>
- * if the sequence/timing is right, tx3 may create its local tx on the LocalTransactionChain before tx2,
- * which results in tx2 failing b/c tx3 isn't ready yet. So maintaining ordering prevents this issue
- * (see Bug 4774).
- * <p/>
- * A Promise is added via newReadOnlyTransaction. When the parent class completes the primary shard
- * lookup and creates the TransactionContext (either success or failure), onTransactionContextCreated is
- * called which completes the Promise. A write tx that is created prior to completion will wait on the
- * Promise's Future via findPrimaryShard.
- */
- private final ConcurrentMap<TransactionIdentifier, Promise<Object>> priorReadOnlyTxPromises =
- new ConcurrentHashMap<>();
-
- TransactionChainProxy(final TransactionContextFactory parent, final LocalHistoryIdentifier historyId) {
- super(parent.getActorUtils(), historyId);
- this.parent = parent;
- }
-
- @Override
- public DOMStoreReadTransaction newReadOnlyTransaction() {
- currentState.checkReady();
- TransactionProxy transactionProxy = new TransactionProxy(this, TransactionType.READ_ONLY);
- priorReadOnlyTxPromises.put(transactionProxy.getIdentifier(), Futures.<Object>promise());
- return transactionProxy;
- }
-
- @Override
- public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- getActorUtils().acquireTxCreationPermit();
- return allocateWriteTransaction(TransactionType.READ_WRITE);
- }
-
- @Override
- public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- getActorUtils().acquireTxCreationPermit();
- return allocateWriteTransaction(TransactionType.WRITE_ONLY);
- }
-
- @Override
- public void close() {
- currentState = CLOSED_STATE;
-
- // Send a close transaction chain request to each and every shard
-
- getActorUtils().broadcast(version -> new CloseTransactionChain(getHistoryId(), version).toSerializable(),
- CloseTransactionChain.class);
- }
-
- private TransactionProxy allocateWriteTransaction(final TransactionType type) {
- State localState = currentState;
- localState.checkReady();
-
- final TransactionProxy ret = new TransactionProxy(this, type);
- currentState = new Allocated(ret.getIdentifier(), localState.previousFuture());
- return ret;
- }
-
- @Override
- protected LocalTransactionChain factoryForShard(final String shardName, final ActorSelection shardLeader,
- final ReadOnlyDataTree dataTree) {
- final LocalTransactionChain ret = new LocalTransactionChain(this, shardLeader, dataTree);
- LOG.debug("Allocated transaction chain {} for shard {} leader {}", ret, shardName, shardLeader);
- return ret;
- }
-
- /**
- * This method is overridden to ensure the previous Tx's ready operations complete
- * before we initiate the next Tx in the chain to avoid creation failures if the
- * previous Tx's ready operations haven't completed yet.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
- protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName, final TransactionIdentifier txId) {
- // Read current state atomically
- final State localState = currentState;
-
- // There are no outstanding futures, shortcut
- Future<?> previous = localState.previousFuture();
- if (previous == null) {
- return combineFutureWithPossiblePriorReadOnlyTxFutures(parent.findPrimaryShard(shardName, txId), txId);
- }
-
- final String previousTransactionId;
-
- if (localState instanceof Pending) {
- previousTransactionId = ((Pending) localState).getIdentifier().toString();
- LOG.debug("Tx: {} - waiting for ready futures with pending Tx {}", txId, previousTransactionId);
- } else {
- previousTransactionId = "";
- LOG.debug("Waiting for ready futures on chain {}", getHistoryId());
- }
-
- previous = combineFutureWithPossiblePriorReadOnlyTxFutures(previous, txId);
-
- // Add a callback for completion of the combined Futures.
- final Promise<PrimaryShardInfo> returnPromise = Futures.promise();
-
- final OnComplete onComplete = new OnComplete() {
- @Override
- public void onComplete(final Throwable failure, final Object notUsed) {
- if (failure != null) {
- // A Ready Future failed so fail the returned Promise.
- LOG.error("Tx: {} - ready future failed for previous Tx {}", txId, previousTransactionId);
- returnPromise.failure(failure);
- } else {
- LOG.debug("Tx: {} - previous Tx {} readied - proceeding to FindPrimaryShard",
- txId, previousTransactionId);
-
- // Send the FindPrimaryShard message and use the resulting Future to complete the
- // returned Promise.
- returnPromise.completeWith(parent.findPrimaryShard(shardName, txId));
- }
- }
- };
-
- previous.onComplete(onComplete, getActorUtils().getClientDispatcher());
- return returnPromise.future();
- }
-
- private <T> Future<T> combineFutureWithPossiblePriorReadOnlyTxFutures(final Future<T> future,
- final TransactionIdentifier txId) {
- return priorReadOnlyTxPromises.isEmpty() || priorReadOnlyTxPromises.containsKey(txId) ? future
- // Tough luck, we need do some work
- : combineWithPriorReadOnlyTxFutures(future, txId);
- }
-
- // Split out of the common path
- private <T> Future<T> combineWithPriorReadOnlyTxFutures(final Future<T> future, final TransactionIdentifier txId) {
- // Take a stable snapshot, and check if we raced
- final List<Entry<TransactionIdentifier, Promise<Object>>> priorReadOnlyTxPromiseEntries =
- new ArrayList<>(priorReadOnlyTxPromises.entrySet());
- if (priorReadOnlyTxPromiseEntries.isEmpty()) {
- return future;
- }
-
- final List<Future<Object>> priorReadOnlyTxFutures = new ArrayList<>(priorReadOnlyTxPromiseEntries.size());
- for (Entry<TransactionIdentifier, Promise<Object>> entry: priorReadOnlyTxPromiseEntries) {
- LOG.debug("Tx: {} - waiting on future for prior read-only Tx {}", txId, entry.getKey());
- priorReadOnlyTxFutures.add(entry.getValue().future());
- }
-
- final Future<Iterable<Object>> combinedFutures = Futures.sequence(priorReadOnlyTxFutures,
- getActorUtils().getClientDispatcher());
-
- final Promise<T> returnPromise = Futures.promise();
- final OnComplete<Iterable<Object>> onComplete = new OnComplete<>() {
- @Override
- public void onComplete(final Throwable failure, final Iterable<Object> notUsed) {
- LOG.debug("Tx: {} - prior read-only Tx futures complete", txId);
-
- // Complete the returned Promise with the original Future.
- returnPromise.completeWith(future);
- }
- };
-
- combinedFutures.onComplete(onComplete, getActorUtils().getClientDispatcher());
- return returnPromise.future();
- }
-
- @Override
- protected <T> void onTransactionReady(final TransactionIdentifier transaction,
- final Collection<Future<T>> cohortFutures) {
- final State localState = currentState;
- checkState(localState instanceof Allocated, "Readying transaction %s while state is %s", transaction,
- localState);
- final TransactionIdentifier currentTx = ((Allocated)localState).getIdentifier();
- checkState(transaction.equals(currentTx), "Readying transaction %s while %s is allocated", transaction,
- currentTx);
-
- // Transaction ready and we are not waiting for futures -- go to idle
- if (cohortFutures.isEmpty()) {
- currentState = IDLE_STATE;
- return;
- }
-
- // Combine the ready Futures into 1
- final Future<Iterable<T>> combined = Futures.sequence(cohortFutures, getActorUtils().getClientDispatcher());
-
- // Record the we have outstanding futures
- final State newState = new Submitted(transaction, combined);
- currentState = newState;
-
- // Attach a completion reset, but only if we do not allocate a transaction
- // in-between
- combined.onComplete(new OnComplete<Iterable<T>>() {
- @Override
- public void onComplete(final Throwable arg0, final Iterable<T> arg1) {
- STATE_UPDATER.compareAndSet(TransactionChainProxy.this, newState, IDLE_STATE);
- }
- }, getActorUtils().getClientDispatcher());
- }
-
- @Override
- protected void onTransactionContextCreated(final TransactionIdentifier transactionId) {
- Promise<Object> promise = priorReadOnlyTxPromises.remove(transactionId);
- if (promise != null) {
- promise.success(null);
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSelection;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.yangtools.concepts.AbstractSimpleIdentifiable;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-abstract class TransactionContext extends AbstractSimpleIdentifiable<TransactionIdentifier> {
- private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
-
- private final short transactionVersion;
-
- private long modificationCount = 0;
- private boolean handOffComplete;
-
- TransactionContext(final TransactionIdentifier transactionIdentifier) {
- this(transactionIdentifier, DataStoreVersions.CURRENT_VERSION);
- }
-
- TransactionContext(final TransactionIdentifier transactionIdentifier, final short transactionVersion) {
- super(transactionIdentifier);
- this.transactionVersion = transactionVersion;
- }
-
- final short getTransactionVersion() {
- return transactionVersion;
- }
-
- final void incrementModificationCount() {
- modificationCount++;
- }
-
- final void logModificationCount() {
- LOG.debug("Total modifications on Tx {} = [ {} ]", getIdentifier(), modificationCount);
- }
-
- /**
- * Invoked by {@link AbstractTransactionContextWrapper} when it has finished handing
- * off operations to this context. From this point on, the context is responsible
- * for throttling operations.
- *
- * <p>
- * Implementations can rely on the wrapper calling this operation in a synchronized
- * block, so they do not need to ensure visibility of this state transition themselves.
- */
- final void operationHandOffComplete() {
- handOffComplete = true;
- }
-
- final boolean isOperationHandOffComplete() {
- return handOffComplete;
- }
-
- /**
- * A TransactionContext that uses operation limiting should return true else false.
- *
- * @return true if operation limiting is used, false otherwise
- */
- boolean usesOperationLimiting() {
- return false;
- }
-
- abstract void executeDelete(YangInstanceIdentifier path, Boolean havePermit);
-
- abstract void executeMerge(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit);
-
- abstract void executeWrite(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit);
-
- abstract <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> proxyFuture, Boolean havePermit);
-
- abstract Future<ActorSelection> readyTransaction(Boolean havePermit,
- Optional<SortedSet<String>> participatingShardNames);
-
- abstract Future<Object> directCommit(Boolean havePermit);
-
- abstract void closeTransaction();
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import com.google.common.base.FinalizablePhantomReference;
-import com.google.common.base.FinalizableReferenceQueue;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A PhantomReference that closes remote transactions for a TransactionContext when it's
- * garbage collected. This is used for read-only transactions as they're not explicitly closed
- * by clients. So the only way to detect that a transaction is no longer in use and it's safe
- * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
- * but TransactionProxy instances should generally be short-lived enough to avoid being moved
- * to the old generation space and thus should be cleaned up in a timely manner as the GC
- * runs on the young generation (eden, swap1...) space much more frequently.
- */
-final class TransactionContextCleanup extends FinalizablePhantomReference<TransactionProxy> {
- private static final Logger LOG = LoggerFactory.getLogger(TransactionContextCleanup.class);
- /**
- * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
- * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
- * trickery to clean up its internal thread when the bundle is unloaded.
- */
- private static final FinalizableReferenceQueue QUEUE = new FinalizableReferenceQueue();
-
- /**
- * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
- * necessary because PhantomReferences need a hard reference so they're not garbage collected.
- * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
- * and thus becomes eligible for garbage collection.
- */
- private static final Map<TransactionContext, TransactionContextCleanup> CACHE = new ConcurrentHashMap<>();
-
- private final TransactionContext cleanup;
-
- private TransactionContextCleanup(final TransactionProxy referent, final TransactionContext cleanup) {
- super(referent, QUEUE);
- this.cleanup = cleanup;
- }
-
- static void track(final TransactionProxy referent, final TransactionContext cleanup) {
- final TransactionContextCleanup ret = new TransactionContextCleanup(referent, cleanup);
- CACHE.put(cleanup, ret);
- }
-
- @Override
- public void finalizeReferent() {
- LOG.trace("Cleaning up {} Tx actors", cleanup);
-
- if (CACHE.remove(cleanup) != null) {
- cleanup.closeTransaction();
- }
- }
-
- static void untrack(final TransactionContext cleanup) {
- CACHE.remove(cleanup);
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSelection;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
-import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
-import scala.concurrent.Future;
-
-/**
- * An {@link AbstractTransactionContextFactory} which produces TransactionContext instances for single
- * transactions (ie not chained).
- */
-final class TransactionContextFactory extends AbstractTransactionContextFactory<LocalTransactionFactoryImpl> {
- private final AtomicLong nextHistory = new AtomicLong(1);
-
- TransactionContextFactory(final ActorUtils actorUtils, final ClientIdentifier clientId) {
- super(actorUtils, new LocalHistoryIdentifier(clientId, 0));
- }
-
- @Override
- public void close() {
- }
-
- @Override
- protected LocalTransactionFactoryImpl factoryForShard(final String shardName, final ActorSelection shardLeader,
- final ReadOnlyDataTree dataTree) {
- return new LocalTransactionFactoryImpl(getActorUtils(), shardLeader, dataTree);
- }
-
- @Override
- protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName, final TransactionIdentifier txId) {
- return getActorUtils().findPrimaryShardAsync(shardName);
- }
-
- @Override
- protected <T> void onTransactionReady(final TransactionIdentifier transaction,
- final Collection<Future<T>> cohortFutures) {
- // Transactions are disconnected, this is a no-op
- }
-
- DOMStoreTransactionChain createTransactionChain() {
- return new TransactionChainProxy(this, new LocalHistoryIdentifier(getHistoryId().getClientId(),
- nextHistory.getAndIncrement()));
- }
-
- @Override
- protected void onTransactionContextCreated(final TransactionIdentifier transactionId) {
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * A TransactionOperation to apply a specific modification. Subclasses provide type capture of required data, so that
- * we instantiate AbstractModification subclasses for the bare minimum time required.
- */
-abstract class TransactionModificationOperation extends TransactionOperation {
- private abstract static class AbstractDataOperation extends TransactionModificationOperation {
- private final NormalizedNode data;
-
- AbstractDataOperation(final YangInstanceIdentifier path, final NormalizedNode data) {
- super(path);
- this.data = requireNonNull(data);
- }
-
- final NormalizedNode data() {
- return data;
- }
- }
-
- static final class DeleteOperation extends TransactionModificationOperation {
- DeleteOperation(final YangInstanceIdentifier path) {
- super(path);
- }
-
- @Override
- protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
- transactionContext.executeDelete(path(), havePermit);
- }
- }
-
- static final class MergeOperation extends AbstractDataOperation {
- MergeOperation(final YangInstanceIdentifier path, final NormalizedNode data) {
- super(path, data);
- }
-
- @Override
- protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
- transactionContext.executeMerge(path(), data(), havePermit);
- }
- }
-
- static final class WriteOperation extends AbstractDataOperation {
- WriteOperation(final YangInstanceIdentifier path, final NormalizedNode data) {
- super(path, data);
- }
-
- @Override
- protected void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
- transactionContext.executeWrite(path(), data(), havePermit);
- }
- }
-
- private final YangInstanceIdentifier path;
-
- TransactionModificationOperation(final YangInstanceIdentifier path) {
- this.path = requireNonNull(path);
- }
-
- final YangInstanceIdentifier path() {
- return path;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.eclipse.jdt.annotation.Nullable;
-
-/**
- * Abstract superclass for transaction operations which should be executed
- * on a {@link TransactionContext} at a later point in time.
- */
-abstract class TransactionOperation {
- /**
- * Execute the delayed operation.
- *
- * @param transactionContext the TransactionContext
- * @param havePermit Boolean indicator if this operation has tried and acquired a permit, null if there was no
- * attempt to acquire a permit.
- */
- protected abstract void invoke(TransactionContext transactionContext, @Nullable Boolean havePermit);
-}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorSelection;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.DeleteOperation;
-import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.MergeOperation;
-import org.opendaylight.controller.cluster.datastore.TransactionModificationOperation.WriteOperation;
-import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather;
-import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-/**
- * A transaction potentially spanning multiple backend shards.
- */
-public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier>
- implements DOMStoreReadWriteTransaction {
- private enum TransactionState {
- OPEN,
- READY,
- CLOSED,
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
- private static final DeleteOperation ROOT_DELETE_OPERATION = new DeleteOperation(YangInstanceIdentifier.of());
-
- private final Map<String, AbstractTransactionContextWrapper> txContextWrappers = new TreeMap<>();
- private final AbstractTransactionContextFactory<?> txContextFactory;
- private final TransactionType type;
- private TransactionState state = TransactionState.OPEN;
-
- @VisibleForTesting
- public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
- super(txContextFactory.nextIdentifier(), txContextFactory.getActorUtils().getDatastoreContext()
- .isTransactionDebugContextEnabled());
- this.txContextFactory = txContextFactory;
- this.type = requireNonNull(type);
-
- LOG.debug("New {} Tx - {}", type, getIdentifier());
- }
-
- @Override
- public FluentFuture<Boolean> exists(final YangInstanceIdentifier path) {
- return executeRead(shardNameFromIdentifier(path), new DataExists(path, DataStoreVersions.CURRENT_VERSION));
- }
-
- private <T> FluentFuture<T> executeRead(final String shardName, final AbstractRead<T> readCmd) {
- checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
-
- LOG.trace("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
-
- final SettableFuture<T> proxyFuture = SettableFuture.create();
- AbstractTransactionContextWrapper contextWrapper = wrapperFromShardName(shardName);
- contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
- transactionContext.executeRead(readCmd, proxyFuture, havePermit);
- }
- });
-
- return FluentFuture.from(proxyFuture);
- }
-
- @Override
- public FluentFuture<Optional<NormalizedNode>> read(final YangInstanceIdentifier path) {
- checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
- requireNonNull(path, "path should not be null");
-
- LOG.trace("Tx {} read {}", getIdentifier(), path);
- return path.isEmpty() ? readAllData() : singleShardRead(shardNameFromIdentifier(path), path);
- }
-
- private FluentFuture<Optional<NormalizedNode>> singleShardRead(final String shardName,
- final YangInstanceIdentifier path) {
- return executeRead(shardName, new ReadData(path, DataStoreVersions.CURRENT_VERSION));
- }
-
- private FluentFuture<Optional<NormalizedNode>> readAllData() {
- final var actorUtils = getActorUtils();
- return RootScatterGather.gather(actorUtils, actorUtils.getConfiguration().getAllShardNames().stream()
- .map(shardName -> singleShardRead(shardName, YangInstanceIdentifier.of())));
- }
-
- @Override
- public void delete(final YangInstanceIdentifier path) {
- checkModificationState("delete", path);
-
- if (path.isEmpty()) {
- deleteAllData();
- } else {
- executeModification(new DeleteOperation(path));
- }
- }
-
- private void deleteAllData() {
- for (String shardName : getActorUtils().getConfiguration().getAllShardNames()) {
- wrapperFromShardName(shardName).maybeExecuteTransactionOperation(ROOT_DELETE_OPERATION);
- }
- }
-
- @Override
- public void merge(final YangInstanceIdentifier path, final NormalizedNode data) {
- checkModificationState("merge", path);
-
- if (path.isEmpty()) {
- mergeAllData(RootScatterGather.castRootNode(data));
- } else {
- executeModification(new MergeOperation(path, data));
- }
- }
-
- private void mergeAllData(final ContainerNode rootData) {
- if (!rootData.isEmpty()) {
- RootScatterGather.scatterTouched(rootData, this::wrapperFromRootChild).forEach(
- scattered -> scattered.shard().maybeExecuteTransactionOperation(
- new MergeOperation(YangInstanceIdentifier.of(), scattered.container())));
- }
- }
-
- @Override
- public void write(final YangInstanceIdentifier path, final NormalizedNode data) {
- checkModificationState("write", path);
-
- if (path.isEmpty()) {
- writeAllData(RootScatterGather.castRootNode(data));
- } else {
- executeModification(new WriteOperation(path, data));
- }
- }
-
- private void writeAllData(final ContainerNode rootData) {
- RootScatterGather.scatterAll(rootData, this::wrapperFromRootChild,
- getActorUtils().getConfiguration().getAllShardNames().stream().map(this::wrapperFromShardName)).forEach(
- scattered -> scattered.shard().maybeExecuteTransactionOperation(
- new WriteOperation(YangInstanceIdentifier.of(), scattered.container())));
- }
-
- private void executeModification(final TransactionModificationOperation operation) {
- wrapperFromShardName(shardNameFromIdentifier(operation.path())).maybeExecuteTransactionOperation(operation);
- }
-
- private void checkModificationState(final String opName, final YangInstanceIdentifier path) {
- checkState(type != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed");
- checkState(state == TransactionState.OPEN, "Transaction is sealed - further modifications are not allowed");
- LOG.trace("Tx {} {} {}", getIdentifier(), opName, path);
- }
-
- private boolean seal(final TransactionState newState) {
- if (state == TransactionState.OPEN) {
- state = newState;
- return true;
- }
- return false;
- }
-
- @Override
- public final void close() {
- if (!seal(TransactionState.CLOSED)) {
- checkState(state == TransactionState.CLOSED, "Transaction %s is ready, it cannot be closed",
- getIdentifier());
- // Idempotent no-op as per AutoCloseable recommendation
- return;
- }
-
- for (AbstractTransactionContextWrapper contextWrapper : txContextWrappers.values()) {
- contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(final TransactionContext transactionContext, final Boolean havePermit) {
- transactionContext.closeTransaction();
- }
- });
- }
-
-
- txContextWrappers.clear();
- }
-
- @Override
- public final AbstractThreePhaseCommitCohort<?> ready() {
- checkState(type != TransactionType.READ_ONLY, "Read-only transactions cannot be readied");
-
- final boolean success = seal(TransactionState.READY);
- checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
-
- LOG.debug("Tx {} Readying {} components for commit", getIdentifier(), txContextWrappers.size());
-
- final AbstractThreePhaseCommitCohort<?> ret = switch (txContextWrappers.size()) {
- case 0 -> NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
- case 1 -> {
- final Entry<String, AbstractTransactionContextWrapper> e = Iterables.getOnlyElement(
- txContextWrappers.entrySet());
- yield createSingleCommitCohort(e.getKey(), e.getValue());
- }
- default -> createMultiCommitCohort();
- };
- txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
-
- final Throwable debugContext = getDebugContext();
- return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext);
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
- final AbstractTransactionContextWrapper contextWrapper) {
-
- LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
-
- final OperationCallback.Reference operationCallbackRef =
- new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
-
- final TransactionContext transactionContext = contextWrapper.getTransactionContext();
- final Future future;
- if (transactionContext == null) {
- final Promise promise = akka.dispatch.Futures.promise();
- contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
- promise.completeWith(getDirectCommitFuture(newTransactionContext, operationCallbackRef,
- havePermit));
- }
- });
- future = promise.future();
- } else {
- // avoid the creation of a promise and a TransactionOperation
- future = getDirectCommitFuture(transactionContext, operationCallbackRef, null);
- }
-
- return new SingleCommitCohortProxy(txContextFactory.getActorUtils(), future, getIdentifier(),
- operationCallbackRef);
- }
-
- private Future<?> getDirectCommitFuture(final TransactionContext transactionContext,
- final OperationCallback.Reference operationCallbackRef, final Boolean havePermit) {
- TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
- txContextFactory.getActorUtils());
- operationCallbackRef.set(rateLimitingCallback);
- rateLimitingCallback.run();
- return transactionContext.directCommit(havePermit);
- }
-
- private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
-
- final List<ThreePhaseCommitCohortProxy.CohortInfo> cohorts = new ArrayList<>(txContextWrappers.size());
- final Optional<SortedSet<String>> shardNames = Optional.of(new TreeSet<>(txContextWrappers.keySet()));
- for (Entry<String, AbstractTransactionContextWrapper> e : txContextWrappers.entrySet()) {
- LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
-
- final AbstractTransactionContextWrapper wrapper = e.getValue();
-
- // The remote tx version is obtained the via TransactionContext which may not be available yet so
- // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the
- // TransactionContext is available.
- cohorts.add(new ThreePhaseCommitCohortProxy.CohortInfo(wrapper.readyTransaction(shardNames),
- () -> wrapper.getTransactionContext().getTransactionVersion()));
- }
-
- return new ThreePhaseCommitCohortProxy(txContextFactory.getActorUtils(), cohorts, getIdentifier());
- }
-
- private String shardNameFromIdentifier(final YangInstanceIdentifier path) {
- return getActorUtils().getShardStrategyFactory().getStrategy(path).findShard(path);
- }
-
- private AbstractTransactionContextWrapper wrapperFromRootChild(final PathArgument childId) {
- return wrapperFromShardName(shardNameFromIdentifier(YangInstanceIdentifier.of(childId)));
- }
-
- private AbstractTransactionContextWrapper wrapperFromShardName(final String shardName) {
- final AbstractTransactionContextWrapper existing = txContextWrappers.get(shardName);
- if (existing != null) {
- return existing;
- }
-
- final AbstractTransactionContextWrapper fresh = txContextFactory.newTransactionContextWrapper(this, shardName);
- txContextWrappers.put(shardName, fresh);
- return fresh;
- }
-
- TransactionType getType() {
- return type;
- }
-
- boolean isReady() {
- return state != TransactionState.OPEN;
- }
-
- final ActorUtils getActorUtils() {
- return txContextFactory.getActorUtils();
- }
-}
// vi: set smarttab et sw=4 tabstop=4:
module distributed-datastore-provider {
-
yang-version 1;
namespace "urn:opendaylight:params:xml:ns:yang:controller:config:distributed-datastore-provider";
prefix "distributed-datastore-provider";
description
"This module contains the base YANG definitions for
- the distributed datastore provider implementation";
+ the distributed datastore provider implementation";
+
+ revision "2023-12-29" {
+ description "Remote use-tell-based-protocol leaf";
+ }
revision "2014-06-12" {
description
maximum size in bytes for a message slice.";
}
- leaf use-tell-based-protocol {
- status obsolete;
- default false;
- type boolean;
- description "Use a newer protocol between the frontend and backend. This feature is considered
- exprerimental at this point.";
- }
-
leaf file-backed-streaming-threshold-in-megabytes {
default 128;
type non-zero-uint32-type;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
@Test
public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
- final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
+ final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+ try (var dataStore = testKit.setupAbstractDataStore(testParameter,
+ "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
- final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+ final var txChain = dataStore.createTransactionChain();
- DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ var writeTx = txChain.newWriteOnlyTransaction();
writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
testKit.doCommit(writeTx.ready());
.untilAsserted(() -> {
// verify frontend metadata has no holes in purged transactions causing overtime memory leak
final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow();
- FrontendShardDataTreeSnapshotMetadata frontendMetadata =
- (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+ final var frontendMetadata = (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
.executeOperation(localShard, new RequestFrontendMetadata());
final var clientMeta = frontendMetadata.getClients().get(0);
- if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
- assertTellMetadata(clientMeta);
- } else {
- assertAskMetadata(clientMeta);
+ final var iterator = clientMeta.getCurrentHistories().iterator();
+ var metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
}
+ assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
});
final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH)
}
}
- private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
- // ask based should track no metadata
- assertEquals(List.of(), clientMeta.getCurrentHistories());
- }
-
- private static void assertTellMetadata(final FrontendClientMetadata clientMeta) {
- final var iterator = clientMeta.getCurrentHistories().iterator();
- var metadata = iterator.next();
- while (iterator.hasNext() && metadata.getHistoryId() != 1) {
- metadata = iterator.next();
- }
- assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
- }
-
@SuppressWarnings("checkstyle:IllegalCatch")
private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
throws Exception {
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.dispatch.Futures;
-import akka.testkit.javadsl.TestKit;
-import akka.util.Timeout;
-import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.FluentFuture;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.TransactionProxyTest.TestException;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Abstract base class for TransactionProxy unit tests.
- *
- * @author Thomas Pantelis
- */
-public abstract class AbstractTransactionProxyTest extends AbstractTest {
- protected final Logger log = LoggerFactory.getLogger(getClass());
-
- private static ActorSystem system;
- private static SchemaContext SCHEMA_CONTEXT;
-
- private final Configuration configuration = new MockConfiguration() {
- Map<String, ShardStrategy> strategyMap = ImmutableMap.<String, ShardStrategy>builder().put(
- TestModel.JUNK_QNAME.getLocalName(), new ShardStrategy() {
- @Override
- public String findShard(final YangInstanceIdentifier path) {
- return TestModel.JUNK_QNAME.getLocalName();
- }
- }).put(
- CarsModel.BASE_QNAME.getLocalName(), new ShardStrategy() {
- @Override
- public String findShard(final YangInstanceIdentifier path) {
- return CarsModel.BASE_QNAME.getLocalName();
- }
- }).build();
-
- @Override
- public ShardStrategy getStrategyForModule(final String moduleName) {
- return strategyMap.get(moduleName);
- }
-
- @Override
- public String getModuleNameFromNameSpace(final String nameSpace) {
- if (TestModel.JUNK_QNAME.getNamespace().toString().equals(nameSpace)) {
- return TestModel.JUNK_QNAME.getLocalName();
- } else if (CarsModel.BASE_QNAME.getNamespace().toString().equals(nameSpace)) {
- return CarsModel.BASE_QNAME.getLocalName();
- }
- return null;
- }
- };
-
- @Mock
- protected ActorUtils mockActorContext;
-
- protected TransactionContextFactory mockComponentFactory;
-
- @Mock
- private ClusterWrapper mockClusterWrapper;
-
- protected final String memberName = "mock-member";
-
- private final int operationTimeoutInSeconds = 2;
- protected final Builder dataStoreContextBuilder = DatastoreContext.newBuilder()
- .operationTimeoutInSeconds(operationTimeoutInSeconds);
-
- @BeforeClass
- public static void setUpClass() {
-
- Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
- .put("akka.actor.default-dispatcher.type",
- "akka.testkit.CallingThreadDispatcherConfigurator").build())
- .withFallback(ConfigFactory.load());
- system = ActorSystem.create("test", config);
- SCHEMA_CONTEXT = TestModel.createTestContext();
- }
-
- @AfterClass
- public static void tearDownClass() {
- TestKit.shutdownActorSystem(system);
- system = null;
- SCHEMA_CONTEXT = null;
- }
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- doReturn(getSystem()).when(mockActorContext).getActorSystem();
- doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
- doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
- doReturn(new ShardStrategyFactory(configuration)).when(mockActorContext).getShardStrategyFactory();
- doReturn(SCHEMA_CONTEXT).when(mockActorContext).getSchemaContext();
- doReturn(new Timeout(operationTimeoutInSeconds, TimeUnit.SECONDS)).when(mockActorContext).getOperationTimeout();
- doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
- doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
- doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
- doReturn(new Timeout(5, TimeUnit.SECONDS)).when(mockActorContext).getTransactionCommitOperationTimeout();
-
- final ClientIdentifier mockClientId = MockIdentifiers.clientIdentifier(getClass(), memberName);
- mockComponentFactory = new TransactionContextFactory(mockActorContext, mockClientId);
-
- Timer timer = new MetricRegistry().timer("test");
- doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class));
- }
-
- protected ActorSystem getSystem() {
- return system;
- }
-
- protected CreateTransaction eqCreateTransaction(final String expMemberName,
- final TransactionType type) {
- class CreateTransactionArgumentMatcher implements ArgumentMatcher<CreateTransaction> {
- @Override
- public boolean matches(final CreateTransaction argument) {
- return argument.getTransactionId().getHistoryId().getClientId().getFrontendId().getMemberName()
- .getName().equals(expMemberName) && argument.getTransactionType() == type.ordinal();
- }
- }
-
- return argThat(new CreateTransactionArgumentMatcher());
- }
-
- protected DataExists eqDataExists() {
- class DataExistsArgumentMatcher implements ArgumentMatcher<DataExists> {
- @Override
- public boolean matches(final DataExists argument) {
- return argument.getPath().equals(TestModel.TEST_PATH);
- }
- }
-
- return argThat(new DataExistsArgumentMatcher());
- }
-
- protected ReadData eqReadData() {
- return eqReadData(TestModel.TEST_PATH);
- }
-
- protected ReadData eqReadData(final YangInstanceIdentifier path) {
- class ReadDataArgumentMatcher implements ArgumentMatcher<ReadData> {
- @Override
- public boolean matches(final ReadData argument) {
- return argument.getPath().equals(path);
- }
- }
-
- return argThat(new ReadDataArgumentMatcher());
- }
-
- protected Future<Object> readyTxReply(final String path) {
- return Futures.successful((Object)new ReadyTransactionReply(path));
- }
-
-
- protected Future<ReadDataReply> readDataReply(final NormalizedNode data) {
- return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
- }
-
- protected Future<DataExistsReply> dataExistsReply(final boolean exists) {
- return Futures.successful(new DataExistsReply(exists, DataStoreVersions.CURRENT_VERSION));
- }
-
- protected Future<BatchedModificationsReply> batchedModificationsReply(final int count) {
- return Futures.successful(new BatchedModificationsReply(count));
- }
-
- @SuppressWarnings("unchecked")
- protected Future<Object> incompleteFuture() {
- return mock(Future.class);
- }
-
- protected ActorSelection actorSelection(final ActorRef actorRef) {
- return getSystem().actorSelection(actorRef.path());
- }
-
- protected void expectBatchedModifications(final ActorRef actorRef, final int count) {
- doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
- }
-
- protected void expectBatchedModifications(final int count) {
- doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
- }
-
- protected void expectBatchedModificationsReady(final ActorRef actorRef) {
- expectBatchedModificationsReady(actorRef, false);
- }
-
- protected void expectBatchedModificationsReady(final ActorRef actorRef, final boolean doCommitOnReady) {
- doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
- readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
- }
-
- protected void expectIncompleteBatchedModifications() {
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
- }
-
- protected void expectFailedBatchedModifications(final ActorRef actorRef) {
- doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
- }
-
- protected void expectReadyLocalTransaction(final ActorRef actorRef, final boolean doCommitOnReady) {
- doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
- readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(ReadyLocalTransaction.class), any(Timeout.class));
- }
-
- protected CreateTransactionReply createTransactionReply(final ActorRef actorRef, final short transactionVersion) {
- return new CreateTransactionReply(actorRef.path().toString(), nextTransactionId(), transactionVersion);
- }
-
- protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem) {
- return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD);
- }
-
- protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem,
- final String shardName) {
- return setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
- DataStoreVersions.CURRENT_VERSION);
- }
-
- protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem,
- final String shardName, final short transactionVersion) {
- ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- log.info("Created mock shard actor {}", actorRef);
-
- doReturn(actorSystem.actorSelection(actorRef.path()))
- .when(mockActorContext).actorSelection(actorRef.path().toString());
-
- doReturn(primaryShardInfoReply(actorSystem, actorRef, transactionVersion))
- .when(mockActorContext).findPrimaryShardAsync(eq(shardName));
-
- return actorRef;
- }
-
- protected Future<PrimaryShardInfo> primaryShardInfoReply(final ActorSystem actorSystem, final ActorRef actorRef) {
- return primaryShardInfoReply(actorSystem, actorRef, DataStoreVersions.CURRENT_VERSION);
- }
-
- protected Future<PrimaryShardInfo> primaryShardInfoReply(final ActorSystem actorSystem, final ActorRef actorRef,
- final short transactionVersion) {
- return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
- transactionVersion));
- }
-
- protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
- final TransactionType type, final short transactionVersion, final String shardName) {
- ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
- transactionVersion);
-
- return setupActorContextWithInitialCreateTransaction(actorSystem, type, transactionVersion,
- memberName, shardActorRef);
- }
-
- protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
- final TransactionType type, final short transactionVersion, final String prefix,
- final ActorRef shardActorRef) {
-
- ActorRef txActorRef;
- if (type == TransactionType.WRITE_ONLY
- && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled()) {
- txActorRef = shardActorRef;
- } else {
- txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- log.info("Created mock shard Tx actor {}", txActorRef);
-
- doReturn(actorSystem.actorSelection(txActorRef.path()))
- .when(mockActorContext).actorSelection(txActorRef.path().toString());
-
- doReturn(Futures.successful(createTransactionReply(txActorRef, transactionVersion))).when(mockActorContext)
- .executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(prefix, type), any(Timeout.class));
- }
-
- return txActorRef;
- }
-
- protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
- final TransactionType type) {
- return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
- DefaultShardStrategy.DEFAULT_SHARD);
- }
-
- protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
- final TransactionType type,
- final String shardName) {
- return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
- shardName);
- }
-
- @SuppressWarnings({"checkstyle:avoidHidingCauseException", "checkstyle:IllegalThrows"})
- protected void propagateReadFailedExceptionCause(final FluentFuture<?> future) throws Throwable {
- try {
- future.get(5, TimeUnit.SECONDS);
- fail("Expected ReadFailedException");
- } catch (ExecutionException e) {
- final Throwable cause = e.getCause();
- assertTrue("Unexpected cause: " + cause.getClass(), cause instanceof ReadFailedException);
- throw Throwables.getRootCause(cause);
- }
- }
-
- protected List<BatchedModifications> captureBatchedModifications(final ActorRef actorRef) {
- ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
- ArgumentCaptor.forClass(BatchedModifications.class);
- verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
- eq(actorSelection(actorRef)), batchedModificationsCaptor.capture(), any(Timeout.class));
-
- List<BatchedModifications> batchedModifications = filterCaptured(
- batchedModificationsCaptor, BatchedModifications.class);
- return batchedModifications;
- }
-
- protected <T> List<T> filterCaptured(final ArgumentCaptor<T> captor, final Class<T> type) {
- List<T> captured = new ArrayList<>();
- for (T c: captor.getAllValues()) {
- if (type.isInstance(c)) {
- captured.add(c);
- }
- }
-
- return captured;
- }
-
- protected void verifyOneBatchedModification(final ActorRef actorRef, final Modification expected,
- final boolean expIsReady) {
- List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
- assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
- verifyBatchedModifications(batchedModifications.get(0), expIsReady, expIsReady, expected);
- }
-
- protected void verifyBatchedModifications(final Object message, final boolean expIsReady,
- final Modification... expected) {
- verifyBatchedModifications(message, expIsReady, false, expected);
- }
-
- protected void verifyBatchedModifications(final Object message, final boolean expIsReady,
- final boolean expIsDoCommitOnReady, final Modification... expected) {
- assertEquals("Message type", BatchedModifications.class, message.getClass());
- BatchedModifications batchedModifications = (BatchedModifications)message;
- assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
- assertEquals("isReady", expIsReady, batchedModifications.isReady());
- assertEquals("isDoCommitOnReady", expIsDoCommitOnReady, batchedModifications.isDoCommitOnReady());
- for (int i = 0; i < batchedModifications.getModifications().size(); i++) {
- Modification actual = batchedModifications.getModifications().get(i);
- assertEquals("Modification type", expected[i].getClass(), actual.getClass());
- assertEquals("getPath", ((AbstractModification)expected[i]).getPath(),
- ((AbstractModification)actual).getPath());
- if (actual instanceof WriteModification) {
- assertEquals("getData", ((WriteModification)expected[i]).getData(),
- ((WriteModification)actual).getData());
- }
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- protected void verifyCohortFutures(final AbstractThreePhaseCommitCohort<?> proxy,
- final Object... expReplies) {
- assertEquals("getReadyOperationFutures size", expReplies.length,
- proxy.getCohortFutures().size());
-
- List<Object> futureResults = new ArrayList<>();
- for (Future<?> future : proxy.getCohortFutures()) {
- assertNotNull("Ready operation Future is null", future);
- try {
- futureResults.add(Await.result(future, FiniteDuration.create(5, TimeUnit.SECONDS)));
- } catch (Exception e) {
- futureResults.add(e);
- }
- }
-
- for (Object expReply : expReplies) {
- boolean found = false;
- Iterator<?> iter = futureResults.iterator();
- while (iter.hasNext()) {
- Object actual = iter.next();
- if (CommitTransactionReply.isSerializedType(expReply)
- && CommitTransactionReply.isSerializedType(actual)
- || expReply instanceof ActorSelection && Objects.equals(expReply, actual)) {
- found = true;
- } else if (expReply instanceof Class && ((Class<?>) expReply).isInstance(actual)) {
- found = true;
- }
-
- if (found) {
- iter.remove();
- break;
- }
- }
-
- if (!found) {
- fail(String.format("No cohort Future response found for %s. Actual: %s", expReply, futureResults));
- }
- }
- }
-}
import org.junit.Test;
import org.opendaylight.mdsal.binding.dom.codec.impl.BindingCodecContext;
import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStorePropertiesContainer;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStorePropertiesContainer;
/**
* Unit tests for DatastoreContextIntrospector.
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
public class DatastoreContextTest {
+++ /dev/null
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertSame;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.same;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.ArrayList;
-import java.util.List;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.slf4j.Logger;
-import scala.concurrent.Future;
-
-/**
- * Unit tests for DebugThreePhaseCommitCohort.
- *
- * @author Thomas Pantelis
- */
-public class DebugThreePhaseCommitCohortTest {
- private final TransactionIdentifier transactionId = MockIdentifiers.transactionIdentifier(
- DebugThreePhaseCommitCohortTest.class, "mock");
-
- @Test
- public void test() {
- AbstractThreePhaseCommitCohort<?> mockDelegate = mock(AbstractThreePhaseCommitCohort.class);
- Exception failure = new Exception("mock failure");
- ListenableFuture<Object> expFailedFuture = Futures.immediateFailedFuture(failure);
- doReturn(expFailedFuture).when(mockDelegate).canCommit();
- doReturn(expFailedFuture).when(mockDelegate).preCommit();
- doReturn(expFailedFuture).when(mockDelegate).commit();
-
- ListenableFuture<Object> expAbortFuture = Futures.immediateFuture(null);
- doReturn(expAbortFuture).when(mockDelegate).abort();
-
- List<Future<Object>> expCohortFutures = new ArrayList<>();
- doReturn(expCohortFutures).when(mockDelegate).getCohortFutures();
-
- Throwable debugContext = new RuntimeException("mock");
- DebugThreePhaseCommitCohort cohort = new DebugThreePhaseCommitCohort(transactionId, mockDelegate, debugContext);
-
- Logger mockLogger = mock(Logger.class);
- cohort.setLogger(mockLogger);
-
- assertSame("canCommit", expFailedFuture, cohort.canCommit());
- verify(mockLogger).warn(anyString(), same(transactionId), same(failure), same(debugContext));
-
- reset(mockLogger);
- assertSame("preCommit", expFailedFuture, cohort.preCommit());
- verify(mockLogger).warn(anyString(), same(transactionId), same(failure), same(debugContext));
-
- reset(mockLogger);
- assertSame("commit", expFailedFuture, cohort.commit());
- verify(mockLogger).warn(anyString(), same(transactionId), same(failure), same(debugContext));
-
- assertSame("abort", expAbortFuture, cohort.abort());
-
- assertSame("getCohortFutures", expCohortFutures, cohort.getCohortFutures());
-
- reset(mockLogger);
- ListenableFuture<Boolean> expSuccessFuture = Futures.immediateFuture(Boolean.TRUE);
- doReturn(expSuccessFuture).when(mockDelegate).canCommit();
-
- assertSame("canCommit", expSuccessFuture, cohort.canCommit());
- verify(mockLogger, never()).warn(anyString(), any(TransactionIdentifier.class), any(Throwable.class),
- any(Throwable.class));
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class DelayedTransactionContextWrapperTest {
- @Mock
- private ActorUtils actorUtils;
-
- @Mock
- private TransactionContext transactionContext;
-
- private DelayedTransactionContextWrapper transactionContextWrapper;
-
- @Before
- public void setUp() {
- doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
- transactionContextWrapper = new DelayedTransactionContextWrapper(MockIdentifiers.transactionIdentifier(
- DelayedTransactionContextWrapperTest.class, "mock"), actorUtils, "mock");
- }
-
- @Test
- public void testExecutePriorTransactionOperations() {
- for (int i = 0; i < 100; i++) {
- transactionContextWrapper.maybeExecuteTransactionOperation(mock(TransactionOperation.class));
- }
- assertEquals(901, transactionContextWrapper.getLimiter().availablePermits());
-
- transactionContextWrapper.executePriorTransactionOperations(transactionContext);
-
- assertEquals(1001, transactionContextWrapper.getLimiter().availablePermits());
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class DirectTransactionContextWrapperTest {
- @Mock
- private ActorUtils actorUtils;
-
- @Mock
- private TransactionContext transactionContext;
-
- @Mock
- private TransactionOperation transactionOperation;
-
- private DirectTransactionContextWrapper contextWrapper;
-
- @Before
- public void setUp() {
- doReturn(DatastoreContext.newBuilder().build()).when(actorUtils).getDatastoreContext();
- contextWrapper = new DirectTransactionContextWrapper(MockIdentifiers.transactionIdentifier(
- DirectTransactionContextWrapperTest.class, "mock"), actorUtils, "mock",
- transactionContext);
- }
-
- @Test
- public void testMaybeExecuteTransactionOperation() {
- contextWrapper.maybeExecuteTransactionOperation(transactionOperation);
- verify(transactionOperation, times(1)).invoke(transactionContext, null);
- }
-}
package org.opendaylight.controller.cluster.datastore;
import static org.awaitility.Awaitility.await;
-import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
// wait to let the shard catch up with purged
await("Range set leak test").atMost(5, TimeUnit.SECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> {
- final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
- .orElseThrow();
- final var frontendMetadata =
- (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
- .executeOperation(localShard, new RequestFrontendMetadata());
-
- final var clientMeta = frontendMetadata.getClients().get(0);
- if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
- assertTellClientMetadata(clientMeta, numCars * 2);
- } else {
- assertAskClientMetadata(clientMeta);
- }
- });
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
+ final var frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard, new RequestFrontendMetadata());
+
+ assertClientMetadata(frontendMetadata.getClients().get(0), numCars * 2);
+ });
try (var tx = txChain.newReadOnlyTransaction()) {
final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body();
}
}
- private static void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
- // ask based should track no metadata
- assertEquals(List.of(), clientMeta.getCurrentHistories());
- }
-
- private static void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
+ private static void assertClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
final var iterator = clientMeta.getCurrentHistories().iterator();
var metadata = iterator.next();
while (iterator.hasNext() && metadata.getHistoryId() != 1) {
@Test
public void testCloseTransactionMetadataLeak() throws Exception {
- // FIXME: CONTROLLER-2016: ask-based frontend triggers this:
- //
- // java.lang.IllegalStateException: Previous transaction
- // member-2-datastore-testCloseTransactionMetadataLeak-fe-0-chn-1-txn-1-0 is not ready yet
- // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy$Allocated.checkReady()
- // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy.newReadOnlyTransaction()
- assumeTrue(testParameter.isAssignableFrom(ClientBackedDataStore.class));
-
initDatastoresWithCars("testCloseTransactionMetadataLeak");
- final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+ final var txChain = followerDistributedDataStore.createTransactionChain();
- DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ var writeTx = txChain.newWriteOnlyTransaction();
writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
followerTestKit.doCommit(writeTx.ready());
// wait to let the shard catch up with purged
await("wait for purges to settle").atMost(5, TimeUnit.SECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> {
- final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
- .orElseThrow();
- final var frontendMetadata =
- (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
- .executeOperation(localShard, new RequestFrontendMetadata());
-
- final var clientMeta = frontendMetadata.getClients().get(0);
- if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
- assertTellClientMetadata(clientMeta, numCars * 2);
- } else {
- assertAskClientMetadata(clientMeta);
- }
- });
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
+ final var frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard, new RequestFrontendMetadata());
+
+ assertClientMetadata(frontendMetadata.getClients().get(0), numCars * 2);
+ });
}
@Test
raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
final var noShardLeaderCohort = noShardLeaderWriteTx.ready();
- final ListenableFuture<Boolean> canCommit;
-
- // There is difference in behavior here:
- if (!leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
- // ask-based canCommit() times out and aborts
- final var ex = assertThrows(ExecutionException.class,
- () -> leaderTestKit.doCommit(noShardLeaderCohort)).getCause();
- assertThat(ex, instanceOf(NoShardLeaderException.class));
- assertThat(ex.getMessage(), containsString(
- "Shard member-1-shard-cars-testTransactionWithIsolatedLeader currently has no leader."));
- canCommit = null;
- } else {
- // tell-based canCommit() does not have a real timeout and hence continues
- canCommit = noShardLeaderCohort.canCommit();
- Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS);
- assertFalse(canCommit.isDone());
- }
+ // tell-based canCommit() does not have a real timeout and hence continues
+ final var canCommit = noShardLeaderCohort.canCommit();
+ Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS);
+ assertFalse(canCommit.isDone());
sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
.shardElectionTimeoutFactor(100));
leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
leaderTestKit.doCommit(successTxCohort);
- // continuation of tell-based protocol: readied transaction will complete commit, but will report an OLFE
- if (canCommit != null) {
- final var ex = assertThrows(ExecutionException.class,
- () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause();
- assertThat(ex, instanceOf(OptimisticLockFailedException.class));
- assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage());
- final var cause = ex.getCause();
- assertThat(cause, instanceOf(ConflictingModificationAppliedException.class));
- final var cmae = (ConflictingModificationAppliedException) cause;
- assertEquals("Node was created by other transaction.", cmae.getMessage());
- assertEquals(CarsModel.BASE_PATH, cmae.getPath());
- }
+ // continuation of canCommit(): readied transaction will complete commit, but will report an OLFE
+ final var ex = assertThrows(ExecutionException.class,
+ () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause();
+ assertThat(ex, instanceOf(OptimisticLockFailedException.class));
+ assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage());
+ final var cause = ex.getCause();
+ assertThat(cause, instanceOf(ConflictingModificationAppliedException.class));
+ final var cmae = (ConflictingModificationAppliedException) cause;
+ assertEquals("Node was created by other transaction.", cmae.getMessage());
+ assertEquals(CarsModel.BASE_PATH, cmae.getPath());
}
@Test
@Test
public void testReadWriteMessageSlicing() throws Exception {
- // The slicing is only implemented for tell-based protocol
- assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
-
leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
followerDatastoreContextBuilder.maximumMessageSliceSize(100);
initDatastoresWithCars("testLargeReadReplySlicing");
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
final String typeName, final String moduleShardsConfig,
final String modulesConfig, final boolean waitUntilLeader,
final EffectiveModelContext schemaContext,
- final String... shardNames)
- throws Exception {
+ final String... shardNames) throws Exception {
final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
final Configuration config = new ConfigurationImpl(moduleShardsConfig, modulesConfig);
setDataStoreName(typeName);
- // Make sure we set up datastore context correctly
- datastoreContextBuilder.useTellBasedProtocol(ClientBackedDataStore.class.isAssignableFrom(implementation));
-
final DatastoreContext datastoreContext = datastoreContextBuilder.build();
final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev140612.DataStoreProperties.ExportOnRecovery;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+++ /dev/null
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import akka.actor.ActorSelection;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Optional;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendType;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.util.concurrent.FluentFutures;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import scala.concurrent.Future;
-
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class LocalTransactionContextTest {
- @Mock
- private DOMStoreReadWriteTransaction readWriteTransaction;
- @Mock
- private LocalTransactionReadySupport mockReadySupport;
-
- private LocalTransactionContext localTransactionContext;
-
- @Before
- public void setUp() {
- final TransactionIdentifier txId = new TransactionIdentifier(new LocalHistoryIdentifier(ClientIdentifier.create(
- FrontendIdentifier.create(MemberName.forName("member"), FrontendType.forName("type")), 0), 0), 0);
-
- localTransactionContext = new LocalTransactionContext(readWriteTransaction, txId, mockReadySupport) {
- @Override
- DOMStoreWriteTransaction getWriteDelegate() {
- return readWriteTransaction;
- }
-
- @Override
- DOMStoreReadTransaction getReadDelegate() {
- return readWriteTransaction;
- }
- };
- }
-
- @Test
- public void testWrite() {
- YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
- NormalizedNode normalizedNode = mock(NormalizedNode.class);
- localTransactionContext.executeWrite(yangInstanceIdentifier, normalizedNode, null);
- verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
- }
-
- @Test
- public void testMerge() {
- YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
- NormalizedNode normalizedNode = mock(NormalizedNode.class);
- localTransactionContext.executeMerge(yangInstanceIdentifier, normalizedNode, null);
- verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
- }
-
- @Test
- public void testDelete() {
- YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
- localTransactionContext.executeDelete(yangInstanceIdentifier, null);
- verify(readWriteTransaction).delete(yangInstanceIdentifier);
- }
-
- @Test
- public void testRead() {
- YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
- NormalizedNode normalizedNode = mock(NormalizedNode.class);
- doReturn(FluentFutures.immediateFluentFuture(Optional.of(normalizedNode))).when(readWriteTransaction)
- .read(yangInstanceIdentifier);
- localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
- SettableFuture.create(), null);
- verify(readWriteTransaction).read(yangInstanceIdentifier);
- }
-
- @Test
- public void testExists() {
- YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
- doReturn(FluentFutures.immediateTrueFluentFuture()).when(readWriteTransaction).exists(yangInstanceIdentifier);
- localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier, DataStoreVersions.CURRENT_VERSION),
- SettableFuture.create(), null);
- verify(readWriteTransaction).exists(yangInstanceIdentifier);
- }
-
- @Test
- public void testReady() {
- final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
- doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(Optional.empty());
- doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, null);
-
- Future<ActorSelection> future = localTransactionContext.readyTransaction(null, Optional.empty());
- assertTrue(future.isCompleted());
-
- verify(mockReadySupport).onTransactionReady(readWriteTransaction, null);
- }
-
- @Test
- public void testReadyWithWriteError() {
- YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
- NormalizedNode normalizedNode = mock(NormalizedNode.class);
- RuntimeException error = new RuntimeException("mock");
- doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
-
- localTransactionContext.executeWrite(yangInstanceIdentifier, normalizedNode, null);
- localTransactionContext.executeWrite(yangInstanceIdentifier, normalizedNode, null);
-
- verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
-
- doReadyWithExpectedError(error);
- }
-
- @Test
- public void testReadyWithMergeError() {
- YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
- NormalizedNode normalizedNode = mock(NormalizedNode.class);
- RuntimeException error = new RuntimeException("mock");
- doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
-
- localTransactionContext.executeMerge(yangInstanceIdentifier, normalizedNode, null);
- localTransactionContext.executeMerge(yangInstanceIdentifier, normalizedNode, null);
-
- verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
-
- doReadyWithExpectedError(error);
- }
-
- @Test
- public void testReadyWithDeleteError() {
- YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.of();
- RuntimeException error = new RuntimeException("mock");
- doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
-
- localTransactionContext.executeDelete(yangInstanceIdentifier, null);
- localTransactionContext.executeDelete(yangInstanceIdentifier, null);
-
- verify(readWriteTransaction).delete(yangInstanceIdentifier);
-
- doReadyWithExpectedError(error);
- }
-
- private void doReadyWithExpectedError(final RuntimeException expError) {
- LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class);
- doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(Optional.empty());
- doReturn(mockCohort).when(mockReadySupport).onTransactionReady(readWriteTransaction, expError);
-
- localTransactionContext.readyTransaction(null, Optional.empty());
- }
-}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.datastore;
import java.util.concurrent.atomic.AtomicReference;
+@Deprecated(since = "9.0.0", forRemoval = true)
interface OperationCallback {
- OperationCallback NO_OP_CALLBACK = new OperationCallback() {
- @Override
- public void run() {
- }
-
- @Override
- public void success() {
- }
-
- @Override
- public void failure() {
- }
-
- @Override
- public void pause() {
- }
-
- @Override
- public void resume() {
- }
- };
-
class Reference extends AtomicReference<OperationCallback> {
private static final long serialVersionUID = 1L;
- Reference(OperationCallback initialValue) {
+ Reference(final OperationCallback initialValue) {
super(initialValue);
}
}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import akka.actor.ActorRef;
-import akka.actor.Status.Failure;
-import akka.dispatch.ExecutionContexts;
-import akka.dispatch.OnComplete;
-import akka.testkit.javadsl.TestKit;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendType;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Test whether RmoteTransactionContext operates correctly.
- */
-public class RemoteTransactionContextTest extends AbstractActorTest {
- private static final TransactionIdentifier TX_ID = new TransactionIdentifier(new LocalHistoryIdentifier(
- ClientIdentifier.create(FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("test")), 0),
- 0), 0);
-
- private OperationLimiter limiter;
- private RemoteTransactionContext txContext;
- private ActorUtils actorUtils;
- private TestKit kit;
-
- @Before
- public void before() {
- kit = new TestKit(getSystem());
- actorUtils = Mockito.spy(new ActorUtils(getSystem(), kit.getRef(), mock(ClusterWrapper.class),
- mock(Configuration.class)));
- limiter = new OperationLimiter(TX_ID, 4, 0);
- txContext = new RemoteTransactionContext(TX_ID, actorUtils.actorSelection(kit.getRef().path()), actorUtils,
- DataStoreVersions.CURRENT_VERSION, limiter);
- txContext.operationHandOffComplete();
- }
-
- /**
- * OperationLimiter should be correctly released when a failure, like AskTimeoutException occurs. Future reads
- * need to complete immediately with the failure and modifications should not be throttled and thrown away
- * immediately.
- */
- @Test
- public void testLimiterOnFailure() throws TimeoutException, InterruptedException {
- txContext.executeDelete(null, null);
- txContext.executeDelete(null, null);
- assertEquals(2, limiter.availablePermits());
-
- final Future<Object> sendFuture = txContext.sendBatchedModifications();
- assertEquals(2, limiter.availablePermits());
-
- BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
- assertEquals(2, msg.getModifications().size());
- assertEquals(1, msg.getTotalMessagesSent());
- sendReply(new Failure(new NullPointerException()));
- assertFuture(sendFuture, new OnComplete<>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) {
- assertTrue(failure instanceof NullPointerException);
- assertEquals(4, limiter.availablePermits());
-
- // The transaction has failed, no throttling should occur
- txContext.executeDelete(null, null);
- assertEquals(4, limiter.availablePermits());
-
- // Executing a read should result in immediate failure
- final SettableFuture<Boolean> readFuture = SettableFuture.create();
- txContext.executeRead(new DataExists(), readFuture, null);
- assertTrue(readFuture.isDone());
- try {
- readFuture.get();
- fail("Read future did not fail");
- } catch (ExecutionException | InterruptedException e) {
- assertTrue(e.getCause() instanceof NullPointerException);
- }
- }
- });
-
- final Future<Object> commitFuture = txContext.directCommit(null);
-
- msg = kit.expectMsgClass(BatchedModifications.class);
- // Modification should have been thrown away by the dropped transmit induced by executeRead()
- assertEquals(0, msg.getModifications().size());
- assertTrue(msg.isDoCommitOnReady());
- assertTrue(msg.isReady());
- assertEquals(2, msg.getTotalMessagesSent());
- sendReply(new Failure(new IllegalStateException()));
- assertFuture(commitFuture, new OnComplete<>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) {
- assertTrue(failure instanceof IllegalStateException);
- }
- });
-
- kit.expectNoMessage();
- }
-
- /**
- * OperationLimiter gives up throttling at some point -- {@link RemoteTransactionContext} needs to deal with that
- * case, too.
- */
- @Test
- public void testLimiterOnOverflowFailure() throws TimeoutException, InterruptedException {
- txContext.executeDelete(null, null);
- txContext.executeDelete(null, null);
- txContext.executeDelete(null, null);
- txContext.executeDelete(null, null);
- assertEquals(0, limiter.availablePermits());
- txContext.executeDelete(null, null);
- // Last acquire should have failed ...
- assertEquals(0, limiter.availablePermits());
-
- final Future<Object> future = txContext.sendBatchedModifications();
- assertEquals(0, limiter.availablePermits());
-
- BatchedModifications msg = kit.expectMsgClass(BatchedModifications.class);
- // ... so we are sending 5 modifications ...
- assertEquals(5, msg.getModifications().size());
- assertEquals(1, msg.getTotalMessagesSent());
- sendReply(new Failure(new NullPointerException()));
-
- assertFuture(future, new OnComplete<>() {
- @Override
- public void onComplete(final Throwable failure, final Object success) {
- assertTrue(failure instanceof NullPointerException);
- // ... but they account for only 4 permits.
- assertEquals(4, limiter.availablePermits());
- }
- });
-
- kit.expectNoMessage();
- }
-
- private void sendReply(final Object message) {
- final ActorRef askActor = kit.getLastSender();
- kit.watch(askActor);
- kit.reply(new Failure(new IllegalStateException()));
- kit.expectTerminated(askActor);
- }
-
- private static void assertFuture(final Future<Object> future, final OnComplete<Object> complete)
- throws TimeoutException, InterruptedException {
- Await.ready(future, FiniteDuration.apply(3, TimeUnit.SECONDS));
- future.onComplete(complete, ExecutionContexts.fromExecutor(MoreExecutors.directExecutor()));
- }
-}
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.common.Empty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ThreePhaseCommitCohortProxy represents a set of remote cohort proxies.
*/
-public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<ActorSelection> {
-
+@Deprecated(since = "9.0.0", forRemoval = true)
+final class ThreePhaseCommitCohortProxy implements DOMStoreThreePhaseCommitCohort {
private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
+ private static final @NonNull ListenableFuture<Empty> IMMEDIATE_EMPTY_SUCCESS =
+ Futures.immediateFuture(Empty.value());
private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
@Override
}
};
+ private static final OperationCallback NO_OP_CALLBACK = new OperationCallback() {
+ @Override
+ public void run() {
+ }
+
+ @Override
+ public void success() {
+ }
+
+ @Override
+ public void failure() {
+ }
+
+ @Override
+ public void pause() {
+ }
+
+ @Override
+ public void resume() {
+ }
+ };
+
+
private final ActorUtils actorUtils;
private final List<CohortInfo> cohorts;
private final SettableFuture<Empty> cohortsResolvedFuture = SettableFuture.create();
private final TransactionIdentifier transactionId;
private volatile OperationCallback commitOperationCallback;
- public ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
+ ThreePhaseCommitCohortProxy(final ActorUtils actorUtils, final List<CohortInfo> cohorts,
final TransactionIdentifier transactionId) {
this.actorUtils = actorUtils;
this.cohorts = cohorts;
// it's the original exception that is the root cause and of more interest to the client.
return operation("abort", Empty.value(), ABORT_MESSAGE_SUPPLIER, AbortTransactionReply.class, false,
- OperationCallback.NO_OP_CALLBACK);
+ NO_OP_CALLBACK);
}
@Override
public ListenableFuture<? extends CommitInfo> commit() {
OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
- OperationCallback.NO_OP_CALLBACK;
+ NO_OP_CALLBACK;
return operation("commit", CommitInfo.empty(), COMMIT_MESSAGE_SUPPLIER, CommitTransactionReply.class, true,
operationCallback);
}, actorUtils.getClientDispatcher());
}
- @Override
- List<Future<ActorSelection>> getCohortFutures() {
- List<Future<ActorSelection>> cohortFutures = new ArrayList<>(cohorts.size());
- for (CohortInfo info: cohorts) {
- cohortFutures.add(info.getActorFuture());
- }
-
- return cohortFutures;
- }
-
static class CohortInfo {
private final Future<ActorSelection> actorFuture;
private final Supplier<Short> actorVersionSupplier;
import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+@Deprecated(since = "9.0.0", forRemoval = true)
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
static class TestException extends RuntimeException {
+++ /dev/null
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
-
-import akka.actor.ActorRef;
-import akka.util.Timeout;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import org.junit.Assert;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import scala.concurrent.Promise;
-
-public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
- private LocalHistoryIdentifier historyId;
-
- @Override
- public void setUp() {
- super.setUp();
- historyId = MockIdentifiers.historyIdentifier(TransactionChainProxyTest.class, memberName);
- }
-
- @SuppressWarnings("resource")
- @Test
- public void testNewReadOnlyTransaction() {
-
- DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadOnlyTransaction();
- Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
-
- }
-
- @SuppressWarnings("resource")
- @Test
- public void testNewReadWriteTransaction() {
- DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newReadWriteTransaction();
- Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
-
- }
-
- @SuppressWarnings("resource")
- @Test
- public void testNewWriteOnlyTransaction() {
- DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory, historyId).newWriteOnlyTransaction();
- Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
-
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testClose() {
- new TransactionChainProxy(mockComponentFactory, historyId).close();
-
- verify(mockActorContext, times(1)).broadcast(any(Function.class), any(Class.class));
- }
-
- @Test
- public void testRateLimitingUsedInReadWriteTxCreation() {
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
- txChainProxy.newReadWriteTransaction();
-
- verify(mockActorContext, times(1)).acquireTxCreationPermit();
- }
- }
-
- @Test
- public void testRateLimitingUsedInWriteOnlyTxCreation() {
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
- txChainProxy.newWriteOnlyTransaction();
-
- verify(mockActorContext, times(1)).acquireTxCreationPermit();
- }
- }
-
- @Test
- public void testRateLimitingNotUsedInReadOnlyTxCreation() {
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
- txChainProxy.newReadOnlyTransaction();
-
- verify(mockActorContext, times(0)).acquireTxCreationPermit();
- }
- }
-
- /**
- * Tests 2 successive chained write-only transactions and verifies the second transaction isn't
- * initiated until the first one completes its read future.
- */
- @Test
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void testChainedWriteOnlyTransactions() throws Exception {
- dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
-
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
- ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
-
- Promise<Object> batchedReplyPromise1 = akka.dispatch.Futures.promise();
- doReturn(batchedReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
-
- DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
-
- NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- writeTx1.write(TestModel.TEST_PATH, writeNode1);
-
- writeTx1.ready();
-
- verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
- verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
-
- ActorRef txActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
-
- expectBatchedModifications(txActorRef2, 1);
-
- final NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
-
- final DOMStoreWriteTransaction writeTx2 = txChainProxy.newWriteOnlyTransaction();
-
- final AtomicReference<Exception> caughtEx = new AtomicReference<>();
- final CountDownLatch write2Complete = new CountDownLatch(1);
- new Thread(() -> {
- try {
- writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
- } catch (Exception e) {
- caughtEx.set(e);
- } finally {
- write2Complete.countDown();
- }
- }).start();
-
- assertTrue("Tx 2 write should've completed", write2Complete.await(5, TimeUnit.SECONDS));
-
- if (caughtEx.get() != null) {
- throw caughtEx.get();
- }
-
- try {
- verify(mockActorContext, times(1)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
- } catch (AssertionError e) {
- fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
- }
-
- batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
-
- // Tx 2 should've proceeded to find the primary shard.
- verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(
- eq(DefaultShardStrategy.DEFAULT_SHARD));
- }
- }
-
- /**
- * Tests 2 successive chained read-write transactions and verifies the second transaction isn't
- * initiated until the first one completes its read future.
- */
- @Test
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void testChainedReadWriteTransactions() throws Exception {
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
- ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- expectBatchedModifications(txActorRef1, 1);
-
- Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
- doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(txActorRef1)), isA(BatchedModifications.class), any(Timeout.class));
-
- DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
-
- NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- writeTx1.write(TestModel.TEST_PATH, writeNode1);
-
- writeTx1.ready();
-
- verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
-
- String tx2MemberName = "mock-member";
- ActorRef shardActorRef2 = setupActorContextWithoutInitialCreateTransaction(getSystem());
- ActorRef txActorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE,
- DataStoreVersions.CURRENT_VERSION, tx2MemberName, shardActorRef2);
-
- expectBatchedModifications(txActorRef2, 1);
-
- final NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
-
- final DOMStoreWriteTransaction writeTx2 = txChainProxy.newReadWriteTransaction();
-
- final AtomicReference<Exception> caughtEx = new AtomicReference<>();
- final CountDownLatch write2Complete = new CountDownLatch(1);
- new Thread(() -> {
- try {
- writeTx2.write(TestModel.OUTER_LIST_PATH, writeNode2);
- } catch (Exception e) {
- caughtEx.set(e);
- } finally {
- write2Complete.countDown();
- }
- }).start();
-
- assertTrue("Tx 2 write should've completed", write2Complete.await(5, TimeUnit.SECONDS));
-
- if (caughtEx.get() != null) {
- throw caughtEx.get();
- }
-
- try {
- verify(mockActorContext, never()).executeOperationAsync(
- eq(getSystem().actorSelection(shardActorRef2.path())),
- eqCreateTransaction(tx2MemberName, READ_WRITE));
- } catch (AssertionError e) {
- fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
- }
-
- readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
-
- verify(mockActorContext, timeout(5000)).executeOperationAsync(
- eq(getSystem().actorSelection(shardActorRef2.path())),
- eqCreateTransaction(tx2MemberName, READ_WRITE), any(Timeout.class));
- }
- }
-
- @Test(expected = IllegalStateException.class)
- public void testChainedWriteTransactionsWithPreviousTxNotReady() {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
- expectBatchedModifications(actorRef, 1);
-
- try (TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory, historyId)) {
-
- DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
-
- NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- writeTx1.write(TestModel.TEST_PATH, writeNode1);
-
- txChainProxy.newWriteOnlyTransaction();
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_ONLY;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.READ_WRITE;
-import static org.opendaylight.controller.cluster.datastore.TransactionType.WRITE_ONLY;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import akka.dispatch.Futures;
-import akka.util.Timeout;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableSortedSet;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.SortedSet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
-import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
-import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
-import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
-import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeSnapshot;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import scala.concurrent.Promise;
-
-@SuppressWarnings({"resource", "checkstyle:IllegalThrows", "checkstyle:AvoidHidingCauseException"})
-public class TransactionProxyTest extends AbstractTransactionProxyTest {
-
- @SuppressWarnings("serial")
- static class TestException extends RuntimeException {
- }
-
- interface Invoker {
- FluentFuture<?> invoke(TransactionProxy proxy);
- }
-
- @Test
- public void testRead() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
- assertEquals(Optional.empty(), transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
-
- NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
- assertEquals(Optional.of(expectedNode), transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
- }
-
- @Test(expected = ReadFailedException.class)
- public void testReadWithInvalidReplyMessageType() throws Throwable {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
- doReturn(Futures.successful(new Object())).when(mockActorContext)
- .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
- try {
- transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
- } catch (ExecutionException e) {
- throw e.getCause();
- }
- }
-
- @Test(expected = TestException.class)
- public void testReadWithAsyncRemoteOperatonFailure() throws Throwable {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
- doReturn(Futures.failed(new TestException())).when(mockActorContext)
- .executeOperationAsync(eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
- propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
- }
-
- private void testExceptionOnInitialCreateTransaction(final Exception exToThrow, final Invoker invoker)
- throws Throwable {
- ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
-
- if (exToThrow instanceof PrimaryNotFoundException) {
- doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
- } else {
- doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
- .findPrimaryShardAsync(anyString());
- }
-
- doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), any(), any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
- propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
- }
-
- private void testReadWithExceptionOnInitialCreateTransaction(final Exception exToThrow) throws Throwable {
- testExceptionOnInitialCreateTransaction(exToThrow, proxy -> proxy.read(TestModel.TEST_PATH));
- }
-
- @Test(expected = PrimaryNotFoundException.class)
- public void testReadWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
- testReadWithExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"));
- }
-
- @Test(expected = TestException.class)
- public void testReadWhenATimeoutExceptionIsThrown() throws Throwable {
- testReadWithExceptionOnInitialCreateTransaction(new TimeoutException("test",
- new TestException()));
- }
-
- @Test(expected = TestException.class)
- public void testReadWhenAnyOtherExceptionIsThrown() throws Throwable {
- testReadWithExceptionOnInitialCreateTransaction(new TestException());
- }
-
- @Test
- public void testReadWithPriorRecordingOperationSuccessful() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- NormalizedNode expectedNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectBatchedModifications(actorRef, 1);
-
- doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- transactionProxy.write(TestModel.TEST_PATH, expectedNode);
-
- assertEquals(Optional.of(expectedNode), transactionProxy.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
-
- InOrder inOrder = Mockito.inOrder(mockActorContext);
- inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-
- inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
- }
-
- @Test
- public void testReadPreConditionCheck() {
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
- assertThrows(IllegalStateException.class, () -> transactionProxy.read(TestModel.TEST_PATH));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testInvalidCreateTransactionReply() throws Throwable {
- ActorRef actorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
-
- doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext)
- .actorSelection(actorRef.path().toString());
-
- doReturn(primaryShardInfoReply(getSystem(), actorRef)).when(mockActorContext)
- .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
- doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
- eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY),
- any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
- propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
- }
-
- @Test
- public void testExists() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
- doReturn(dataExistsReply(false)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
- Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
-
- assertEquals("Exists response", Boolean.FALSE, exists);
-
- doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
- exists = transactionProxy.exists(TestModel.TEST_PATH).get();
-
- assertEquals("Exists response", Boolean.TRUE, exists);
- }
-
- @Test(expected = PrimaryNotFoundException.class)
- public void testExistsWhenAPrimaryNotFoundExceptionIsThrown() throws Throwable {
- testExceptionOnInitialCreateTransaction(new PrimaryNotFoundException("test"),
- proxy -> proxy.exists(TestModel.TEST_PATH));
- }
-
- @Test(expected = ReadFailedException.class)
- public void testExistsWithInvalidReplyMessageType() throws Throwable {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
- doReturn(Futures.successful(new Object())).when(mockActorContext)
- .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
- try {
- transactionProxy.exists(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
- } catch (ExecutionException e) {
- throw e.getCause();
- }
- }
-
- @Test(expected = TestException.class)
- public void testExistsWithAsyncRemoteOperatonFailure() throws Throwable {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-
- doReturn(Futures.failed(new TestException())).when(mockActorContext)
- .executeOperationAsync(eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
- propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
- }
-
- @Test
- public void testExistsWithPriorRecordingOperationSuccessful() throws Exception {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectBatchedModifications(actorRef, 1);
-
- doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
-
- assertEquals("Exists response", Boolean.TRUE, exists);
-
- InOrder inOrder = Mockito.inOrder(mockActorContext);
- inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-
- inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
- }
-
- @Test(expected = IllegalStateException.class)
- public void testExistsPreConditionCheck() {
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
- transactionProxy.exists(TestModel.TEST_PATH);
- }
-
- @Test
- public void testWrite() {
- dataStoreContextBuilder.shardBatchedModificationCount(1);
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectBatchedModifications(actorRef, 1);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
- }
-
- @Test
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void testWriteAfterAsyncRead() throws Exception {
- ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem(),
- DefaultShardStrategy.DEFAULT_SHARD);
-
- Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
- doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
- eq(getSystem().actorSelection(actorRef.path())),
- eqCreateTransaction(memberName, READ_WRITE), any(Timeout.class));
-
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
- expectBatchedModificationsReady(actorRef);
-
- final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- final CountDownLatch readComplete = new CountDownLatch(1);
- final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
- com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
- new FutureCallback<Optional<NormalizedNode>>() {
- @Override
- public void onSuccess(final Optional<NormalizedNode> result) {
- try {
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- } catch (Exception e) {
- caughtEx.set(e);
- } finally {
- readComplete.countDown();
- }
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- caughtEx.set(failure);
- readComplete.countDown();
- }
- }, MoreExecutors.directExecutor());
-
- createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
-
- Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
-
- final Throwable t = caughtEx.get();
- if (t != null) {
- Throwables.propagateIfPossible(t, Exception.class);
- throw new RuntimeException(t);
- }
-
- // This sends the batched modification.
- transactionProxy.ready();
-
- verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testWritePreConditionCheck() {
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
- transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- }
-
- @Test(expected = IllegalStateException.class)
- public void testWriteAfterReadyPreConditionCheck() {
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- transactionProxy.ready();
-
- transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- }
-
- @Test
- public void testMerge() {
- dataStoreContextBuilder.shardBatchedModificationCount(1);
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectBatchedModifications(actorRef, 1);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
- verifyOneBatchedModification(actorRef, new MergeModification(TestModel.TEST_PATH, nodeToWrite), false);
- }
-
- @Test
- public void testDelete() {
- dataStoreContextBuilder.shardBatchedModificationCount(1);
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
- expectBatchedModifications(actorRef, 1);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- transactionProxy.delete(TestModel.TEST_PATH);
-
- verifyOneBatchedModification(actorRef, new DeleteModification(TestModel.TEST_PATH), false);
- }
-
- @Test
- public void testReadWrite() {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
- expectBatchedModifications(actorRef, 1);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- transactionProxy.read(TestModel.TEST_PATH);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.read(TestModel.TEST_PATH);
-
- transactionProxy.read(TestModel.TEST_PATH);
-
- List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
- assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
- verifyBatchedModifications(batchedModifications.get(0), false,
- new WriteModification(TestModel.TEST_PATH, nodeToWrite));
- }
-
- @Test
- public void testReadyWithReadWrite() {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
- expectBatchedModificationsReady(actorRef, true);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- transactionProxy.read(TestModel.TEST_PATH);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertTrue(ready instanceof SingleCommitCohortProxy);
-
- verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-
- List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
- assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
- verifyBatchedModifications(batchedModifications.get(0), true, true,
- new WriteModification(TestModel.TEST_PATH, nodeToWrite));
-
- assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
- }
-
- @Test
- public void testReadyWithNoModifications() {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
- expectBatchedModificationsReady(actorRef, true);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- transactionProxy.read(TestModel.TEST_PATH);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertTrue(ready instanceof SingleCommitCohortProxy);
-
- verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-
- List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
- assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
- verifyBatchedModifications(batchedModifications.get(0), true, true);
- }
-
- @Test
- public void testReadyWithMultipleShardWrites() {
- ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
- ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
- TestModel.JUNK_QNAME.getLocalName());
-
- expectBatchedModificationsReady(actorRef1);
- expectBatchedModificationsReady(actorRef2);
-
- ActorRef actorRef3 = getSystem().actorOf(Props.create(DoNothingActor.class));
-
- doReturn(getSystem().actorSelection(actorRef3.path())).when(mockActorContext)
- .actorSelection(actorRef3.path().toString());
-
- doReturn(Futures.successful(newPrimaryShardInfo(actorRef3, createDataTree()))).when(mockActorContext)
- .findPrimaryShardAsync(eq(CarsModel.BASE_QNAME.getLocalName()));
-
- expectReadyLocalTransaction(actorRef3, false);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
- transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- transactionProxy.write(CarsModel.BASE_PATH, ImmutableNodes.containerNode(CarsModel.BASE_QNAME));
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
- verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef1),
- actorSelection(actorRef2), actorSelection(actorRef3));
-
- SortedSet<String> expShardNames =
- ImmutableSortedSet.of(DefaultShardStrategy.DEFAULT_SHARD,
- TestModel.JUNK_QNAME.getLocalName(), CarsModel.BASE_QNAME.getLocalName());
-
- ArgumentCaptor<BatchedModifications> batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef1)), batchedMods.capture(), any(Timeout.class));
- assertEquals("Participating shards", Optional.of(expShardNames),
- batchedMods.getValue().getParticipatingShardNames());
-
- batchedMods = ArgumentCaptor.forClass(BatchedModifications.class);
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef2)), batchedMods.capture(), any(Timeout.class));
- assertEquals("Participating shards", Optional.of(expShardNames),
- batchedMods.getValue().getParticipatingShardNames());
-
- ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef3)), readyLocalTx.capture(), any(Timeout.class));
- assertEquals("Participating shards", Optional.of(expShardNames),
- readyLocalTx.getValue().getParticipatingShardNames());
- }
-
- @Test
- public void testReadyWithWriteOnlyAndLastBatchPending() {
- dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
-
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectBatchedModificationsReady(actorRef, true);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertTrue(ready instanceof SingleCommitCohortProxy);
-
- verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-
- List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
- assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
-
- verifyBatchedModifications(batchedModifications.get(0), true, true,
- new WriteModification(TestModel.TEST_PATH, nodeToWrite));
- }
-
- @Test
- public void testReadyWithWriteOnlyAndLastBatchEmpty() {
- dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectBatchedModificationsReady(actorRef, true);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertTrue(ready instanceof SingleCommitCohortProxy);
-
- verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-
- List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
- assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
-
- verifyBatchedModifications(batchedModifications.get(0), false,
- new WriteModification(TestModel.TEST_PATH, nodeToWrite));
-
- verifyBatchedModifications(batchedModifications.get(1), true, true);
- }
-
- @Test
- public void testReadyWithReplyFailure() {
- dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
-
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectFailedBatchedModifications(actorRef);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertTrue(ready instanceof SingleCommitCohortProxy);
-
- verifyCohortFutures((SingleCommitCohortProxy)ready, TestException.class);
- }
-
- @Test
- public void testReadyWithDebugContextEnabled() {
- dataStoreContextBuilder.transactionDebugContextEnabled(true);
-
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- expectBatchedModificationsReady(actorRef, true);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- transactionProxy.merge(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertTrue(ready instanceof DebugThreePhaseCommitCohort);
-
- verifyCohortFutures((DebugThreePhaseCommitCohort)ready, new CommitTransactionReply().toSerializable());
- }
-
- @Test
- public void testReadyWithLocalTransaction() {
- ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
-
- doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
- .actorSelection(shardActorRef.path().toString());
-
- doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, createDataTree()))).when(mockActorContext)
- .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- expectReadyLocalTransaction(shardActorRef, true);
-
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- assertTrue(ready instanceof SingleCommitCohortProxy);
- verifyCohortFutures((SingleCommitCohortProxy)ready, new CommitTransactionReply().toSerializable());
-
- ArgumentCaptor<ReadyLocalTransaction> readyLocalTx = ArgumentCaptor.forClass(ReadyLocalTransaction.class);
- verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(shardActorRef)), readyLocalTx.capture(), any(Timeout.class));
- assertFalse("Participating shards present", readyLocalTx.getValue().getParticipatingShardNames().isPresent());
- }
-
- @Test
- public void testReadyWithLocalTransactionWithFailure() {
- ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
-
- doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
- .actorSelection(shardActorRef.path().toString());
-
- DataTree mockDataTree = createDataTree();
- DataTreeModification mockModification = mockDataTree.takeSnapshot().newModification();
- doThrow(new RuntimeException("mock")).when(mockModification).ready();
-
- doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, mockDataTree))).when(mockActorContext)
- .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- expectReadyLocalTransaction(shardActorRef, true);
-
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
- assertTrue(ready instanceof SingleCommitCohortProxy);
- verifyCohortFutures((SingleCommitCohortProxy)ready, RuntimeException.class);
- }
-
- private void testWriteOnlyTxWithFindPrimaryShardFailure(final Exception toThrow) {
- doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.delete(TestModel.TEST_PATH);
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertTrue(ready instanceof SingleCommitCohortProxy);
-
- verifyCohortFutures((SingleCommitCohortProxy)ready, toThrow.getClass());
- }
-
- @Test
- public void testWriteOnlyTxWithPrimaryNotFoundException() {
- testWriteOnlyTxWithFindPrimaryShardFailure(new PrimaryNotFoundException("mock"));
- }
-
- @Test
- public void testWriteOnlyTxWithNotInitializedException() {
- testWriteOnlyTxWithFindPrimaryShardFailure(new NotInitializedException("mock"));
- }
-
- @Test
- public void testWriteOnlyTxWithNoShardLeaderException() {
- testWriteOnlyTxWithFindPrimaryShardFailure(new NoShardLeaderException("mock"));
- }
-
- @Test
- public void testReadyWithInvalidReplyMessageType() {
- dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
- ActorRef actorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
-
- ActorRef actorRef2 = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY,
- TestModel.JUNK_QNAME.getLocalName());
-
- doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef1)), isA(BatchedModifications.class), any(Timeout.class));
-
- expectBatchedModificationsReady(actorRef2);
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
-
- transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
- transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-
- DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
-
- assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
-
- verifyCohortFutures((ThreePhaseCommitCohortProxy)ready, actorSelection(actorRef2),
- IllegalArgumentException.class);
- }
-
- @Test
- public void testGetIdentifier() {
- setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
- Object id = transactionProxy.getIdentifier();
- assertNotNull("getIdentifier returned null", id);
- assertTrue("Invalid identifier: " + id, id.toString().contains(memberName));
- }
-
- @Test
- public void testClose() {
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(), any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- transactionProxy.read(TestModel.TEST_PATH);
-
- transactionProxy.close();
-
- verify(mockActorContext).sendOperationAsync(
- eq(actorSelection(actorRef)), isA(CloseTransaction.class));
- }
-
- private interface TransactionProxyOperation {
- void run(TransactionProxy transactionProxy);
- }
-
- private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef) {
- return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION);
- }
-
- private PrimaryShardInfo newPrimaryShardInfo(final ActorRef actorRef, final DataTree dataTree) {
- return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION,
- dataTree);
- }
-
- private void throttleOperation(final TransactionProxyOperation operation) {
- throttleOperation(operation, 1, true);
- }
-
- private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
- final boolean shardFound) {
- throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.MILLISECONDS.toNanos(
- mockActorContext.getDatastoreContext().getOperationTimeoutInMillis()));
- }
-
- private void throttleOperation(final TransactionProxyOperation operation, final int outstandingOpsLimit,
- final boolean shardFound, final long expectedCompletionTime) {
- ActorSystem actorSystem = getSystem();
- ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
- // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy
- // we now allow one extra permit to be allowed for ready
- doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2)
- .shardBatchedModificationCount(outstandingOpsLimit - 1).build()).when(mockActorContext)
- .getDatastoreContext();
-
- doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
- .actorSelection(shardActorRef.path().toString());
-
- if (shardFound) {
- doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
- .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
- doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
- .findPrimaryShardAsync(eq("cars"));
-
- } else {
- doReturn(Futures.failed(new Exception("not found")))
- .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
- }
-
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
- any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- long start = System.nanoTime();
-
- operation.run(transactionProxy);
-
- long end = System.nanoTime();
-
- Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
- expectedCompletionTime, end - start),
- end - start > expectedCompletionTime && end - start < expectedCompletionTime * 2);
-
- }
-
- private void completeOperation(final TransactionProxyOperation operation) {
- completeOperation(operation, true);
- }
-
- private void completeOperation(final TransactionProxyOperation operation, final boolean shardFound) {
- ActorSystem actorSystem = getSystem();
- ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
- doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
- .actorSelection(shardActorRef.path().toString());
-
- if (shardFound) {
- doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))).when(mockActorContext)
- .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
- } else {
- doReturn(Futures.failed(new PrimaryNotFoundException("test"))).when(mockActorContext)
- .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
- }
-
- ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
- String actorPath = txActorRef.path().toString();
- CreateTransactionReply createTransactionReply = new CreateTransactionReply(actorPath, nextTransactionId(),
- DataStoreVersions.CURRENT_VERSION);
-
- doReturn(actorSystem.actorSelection(actorPath)).when(mockActorContext).actorSelection(actorPath);
-
- doReturn(Futures.successful(createTransactionReply)).when(mockActorContext).executeOperationAsync(
- eq(actorSystem.actorSelection(shardActorRef.path())), eqCreateTransaction(memberName, READ_WRITE),
- any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- long start = System.nanoTime();
-
- operation.run(transactionProxy);
-
- long end = System.nanoTime();
-
- long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
- .getOperationTimeoutInMillis());
- Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s",
- expected, end - start), end - start <= expected);
- }
-
- private void completeOperationLocal(final TransactionProxyOperation operation, final DataTree dataTree) {
- ActorSystem actorSystem = getSystem();
- ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
- doReturn(actorSystem.actorSelection(shardActorRef.path())).when(mockActorContext)
- .actorSelection(shardActorRef.path().toString());
-
- doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef, dataTree))).when(mockActorContext)
- .findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- long start = System.nanoTime();
-
- operation.run(transactionProxy);
-
- long end = System.nanoTime();
-
- long expected = TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
- .getOperationTimeoutInMillis());
- Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", expected, end - start),
- end - start <= expected);
- }
-
- private static DataTree createDataTree() {
- DataTree dataTree = mock(DataTree.class);
- DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
- DataTreeModification dataTreeModification = mock(DataTreeModification.class);
-
- doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
- doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
-
- return dataTree;
- }
-
- private static DataTree createDataTree(final NormalizedNode readResponse) {
- DataTree dataTree = mock(DataTree.class);
- DataTreeSnapshot dataTreeSnapshot = mock(DataTreeSnapshot.class);
- DataTreeModification dataTreeModification = mock(DataTreeModification.class);
-
- doReturn(dataTreeSnapshot).when(dataTree).takeSnapshot();
- doReturn(dataTreeModification).when(dataTreeSnapshot).newModification();
- doReturn(Optional.of(readResponse)).when(dataTreeModification).readNode(any(YangInstanceIdentifier.class));
-
- return dataTree;
- }
-
-
- @Test
- public void testWriteCompletionForLocalShard() {
- completeOperationLocal(transactionProxy -> {
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- }, createDataTree());
- }
-
- @Test
- public void testWriteThrottlingWhenShardFound() {
- throttleOperation(transactionProxy -> {
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectIncompleteBatchedModifications();
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- });
- }
-
- @Test
- public void testWriteThrottlingWhenShardNotFound() {
- // Confirm that there is no throttling when the Shard is not found
- completeOperation(transactionProxy -> {
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectBatchedModifications(2);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- }, false);
-
- }
-
-
- @Test
- public void testWriteCompletion() {
- completeOperation(transactionProxy -> {
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectBatchedModifications(2);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
- });
- }
-
- @Test
- public void testMergeThrottlingWhenShardFound() {
- throttleOperation(transactionProxy -> {
- NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectIncompleteBatchedModifications();
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
- });
- }
-
- @Test
- public void testMergeThrottlingWhenShardNotFound() {
- completeOperation(transactionProxy -> {
- NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectBatchedModifications(2);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
- }, false);
- }
-
- @Test
- public void testMergeCompletion() {
- completeOperation(transactionProxy -> {
- NormalizedNode nodeToMerge = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectBatchedModifications(2);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToMerge);
- });
-
- }
-
- @Test
- public void testMergeCompletionForLocalShard() {
- completeOperationLocal(transactionProxy -> {
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
-
- }, createDataTree());
- }
-
-
- @Test
- public void testDeleteThrottlingWhenShardFound() {
-
- throttleOperation(transactionProxy -> {
- expectIncompleteBatchedModifications();
-
- transactionProxy.delete(TestModel.TEST_PATH);
-
- transactionProxy.delete(TestModel.TEST_PATH);
- });
- }
-
-
- @Test
- public void testDeleteThrottlingWhenShardNotFound() {
-
- completeOperation(transactionProxy -> {
- expectBatchedModifications(2);
-
- transactionProxy.delete(TestModel.TEST_PATH);
-
- transactionProxy.delete(TestModel.TEST_PATH);
- }, false);
- }
-
- @Test
- public void testDeleteCompletionForLocalShard() {
- completeOperationLocal(transactionProxy -> {
-
- transactionProxy.delete(TestModel.TEST_PATH);
-
- transactionProxy.delete(TestModel.TEST_PATH);
- }, createDataTree());
-
- }
-
- @Test
- public void testDeleteCompletion() {
- completeOperation(transactionProxy -> {
- expectBatchedModifications(2);
-
- transactionProxy.delete(TestModel.TEST_PATH);
-
- transactionProxy.delete(TestModel.TEST_PATH);
- });
-
- }
-
- @Test
- public void testReadThrottlingWhenShardFound() {
-
- throttleOperation(transactionProxy -> {
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqReadData());
-
- transactionProxy.read(TestModel.TEST_PATH);
-
- transactionProxy.read(TestModel.TEST_PATH);
- });
- }
-
- @Test
- public void testReadThrottlingWhenShardNotFound() {
-
- completeOperation(transactionProxy -> {
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqReadData());
-
- transactionProxy.read(TestModel.TEST_PATH);
-
- transactionProxy.read(TestModel.TEST_PATH);
- }, false);
- }
-
-
- @Test
- public void testReadCompletion() {
- completeOperation(transactionProxy -> {
- NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- doReturn(readDataReply(nodeToRead)).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqReadData(), any(Timeout.class));
-
- transactionProxy.read(TestModel.TEST_PATH);
-
- transactionProxy.read(TestModel.TEST_PATH);
- });
-
- }
-
- @Test
- public void testReadCompletionForLocalShard() {
- final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- completeOperationLocal(transactionProxy -> {
- transactionProxy.read(TestModel.TEST_PATH);
-
- transactionProxy.read(TestModel.TEST_PATH);
- }, createDataTree(nodeToRead));
-
- }
-
- @Test
- public void testReadCompletionForLocalShardWhenExceptionOccurs() {
- completeOperationLocal(transactionProxy -> {
- transactionProxy.read(TestModel.TEST_PATH);
-
- transactionProxy.read(TestModel.TEST_PATH);
- }, createDataTree());
-
- }
-
- @Test
- public void testExistsThrottlingWhenShardFound() {
-
- throttleOperation(transactionProxy -> {
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDataExists());
-
- transactionProxy.exists(TestModel.TEST_PATH);
-
- transactionProxy.exists(TestModel.TEST_PATH);
- });
- }
-
- @Test
- public void testExistsThrottlingWhenShardNotFound() {
-
- completeOperation(transactionProxy -> {
- doReturn(incompleteFuture()).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDataExists());
-
- transactionProxy.exists(TestModel.TEST_PATH);
-
- transactionProxy.exists(TestModel.TEST_PATH);
- }, false);
- }
-
-
- @Test
- public void testExistsCompletion() {
- completeOperation(transactionProxy -> {
- doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
- any(ActorSelection.class), eqDataExists(), any(Timeout.class));
-
- transactionProxy.exists(TestModel.TEST_PATH);
-
- transactionProxy.exists(TestModel.TEST_PATH);
- });
-
- }
-
- @Test
- public void testExistsCompletionForLocalShard() {
- final NormalizedNode nodeToRead = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- completeOperationLocal(transactionProxy -> {
- transactionProxy.exists(TestModel.TEST_PATH);
-
- transactionProxy.exists(TestModel.TEST_PATH);
- }, createDataTree(nodeToRead));
-
- }
-
- @Test
- public void testExistsCompletionForLocalShardWhenExceptionOccurs() {
- completeOperationLocal(transactionProxy -> {
- transactionProxy.exists(TestModel.TEST_PATH);
-
- transactionProxy.exists(TestModel.TEST_PATH);
- }, createDataTree());
-
- }
-
- @Test
- public void testReadyThrottling() {
-
- throttleOperation(transactionProxy -> {
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- expectBatchedModifications(1);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- transactionProxy.ready();
- });
- }
-
- @Test
- public void testReadyThrottlingWithTwoTransactionContexts() {
- throttleOperation(transactionProxy -> {
- NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- NormalizedNode carsNode = ImmutableNodes.containerNode(CarsModel.BASE_QNAME);
-
- expectBatchedModifications(2);
-
- transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
-
- // Trying to write to Cars will cause another transaction context to get created
- transactionProxy.write(CarsModel.BASE_PATH, carsNode);
-
- // Now ready should block for both transaction contexts
- transactionProxy.ready();
- }, 1, true, TimeUnit.MILLISECONDS.toNanos(mockActorContext.getDatastoreContext()
- .getOperationTimeoutInMillis()) * 2);
- }
-
- private void testModificationOperationBatching(final TransactionType type) {
- int shardBatchedModificationCount = 3;
- dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
-
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), type);
-
- expectBatchedModifications(actorRef, shardBatchedModificationCount);
-
- YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
- NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
- NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
-
- YangInstanceIdentifier writePath3 = TestModel.INNER_LIST_PATH;
- NormalizedNode writeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
-
- YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
- NormalizedNode mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- YangInstanceIdentifier mergePath2 = TestModel.OUTER_LIST_PATH;
- NormalizedNode mergeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
-
- YangInstanceIdentifier mergePath3 = TestModel.INNER_LIST_PATH;
- NormalizedNode mergeNode3 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
-
- YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
- YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
-
- transactionProxy.write(writePath1, writeNode1);
- transactionProxy.write(writePath2, writeNode2);
- transactionProxy.delete(deletePath1);
- transactionProxy.merge(mergePath1, mergeNode1);
- transactionProxy.merge(mergePath2, mergeNode2);
- transactionProxy.write(writePath3, writeNode3);
- transactionProxy.merge(mergePath3, mergeNode3);
- transactionProxy.delete(deletePath2);
-
- // This sends the last batch.
- transactionProxy.ready();
-
- List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
- assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
-
- verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
- new WriteModification(writePath2, writeNode2), new DeleteModification(deletePath1));
-
- verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
- new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
-
- verifyBatchedModifications(batchedModifications.get(2), true, true,
- new MergeModification(mergePath3, mergeNode3), new DeleteModification(deletePath2));
-
- assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
- }
-
- @Test
- public void testReadWriteModificationOperationBatching() {
- testModificationOperationBatching(READ_WRITE);
- }
-
- @Test
- public void testWriteOnlyModificationOperationBatching() {
- testModificationOperationBatching(WRITE_ONLY);
- }
-
- @Test
- public void testOptimizedWriteOnlyModificationOperationBatching() {
- dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
- testModificationOperationBatching(WRITE_ONLY);
- }
-
- @Test
- public void testModificationOperationBatchingWithInterleavedReads() throws Exception {
-
- int shardBatchedModificationCount = 10;
- dataStoreContextBuilder.shardBatchedModificationCount(shardBatchedModificationCount);
-
- ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
-
- expectBatchedModifications(actorRef, shardBatchedModificationCount);
-
- final YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
- final NormalizedNode writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- YangInstanceIdentifier writePath2 = TestModel.OUTER_LIST_PATH;
- NormalizedNode writeNode2 = ImmutableNodes.containerNode(TestModel.OUTER_LIST_QNAME);
-
- final YangInstanceIdentifier mergePath1 = TestModel.TEST_PATH;
- final NormalizedNode mergeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-
- YangInstanceIdentifier mergePath2 = TestModel.INNER_LIST_PATH;
- NormalizedNode mergeNode2 = ImmutableNodes.containerNode(TestModel.INNER_LIST_QNAME);
-
- final YangInstanceIdentifier deletePath = TestModel.OUTER_LIST_PATH;
-
- doReturn(readDataReply(writeNode2)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
-
- doReturn(readDataReply(mergeNode2)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
-
- doReturn(dataExistsReply(true)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
-
- transactionProxy.write(writePath1, writeNode1);
- transactionProxy.write(writePath2, writeNode2);
-
- Optional<NormalizedNode> readOptional = transactionProxy.read(writePath2).get(5, TimeUnit.SECONDS);
-
- assertEquals("Response NormalizedNode", Optional.of(writeNode2), readOptional);
-
- transactionProxy.merge(mergePath1, mergeNode1);
- transactionProxy.merge(mergePath2, mergeNode2);
-
- readOptional = transactionProxy.read(mergePath2).get(5, TimeUnit.SECONDS);
-
- transactionProxy.delete(deletePath);
-
- Boolean exists = transactionProxy.exists(TestModel.TEST_PATH).get();
- assertEquals("Exists response", Boolean.TRUE, exists);
-
- assertEquals("Response NormalizedNode", Optional.of(mergeNode2), readOptional);
-
- List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
- assertEquals("Captured BatchedModifications count", 3, batchedModifications.size());
-
- verifyBatchedModifications(batchedModifications.get(0), false, new WriteModification(writePath1, writeNode1),
- new WriteModification(writePath2, writeNode2));
-
- verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
- new MergeModification(mergePath2, mergeNode2));
-
- verifyBatchedModifications(batchedModifications.get(2), false, new DeleteModification(deletePath));
-
- InOrder inOrder = Mockito.inOrder(mockActorContext);
- inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-
- inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(writePath2), any(Timeout.class));
-
- inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-
- inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqReadData(mergePath2), any(Timeout.class));
-
- inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
-
- inOrder.verify(mockActorContext).executeOperationAsync(
- eq(actorSelection(actorRef)), eqDataExists(), any(Timeout.class));
- }
-
- @Test
- public void testReadRoot() throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
- EffectiveModelContext schemaContext = SchemaContextHelper.full();
- Configuration configuration = mock(Configuration.class);
- doReturn(configuration).when(mockActorContext).getConfiguration();
- doReturn(schemaContext).when(mockActorContext).getSchemaContext();
- doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
-
- NormalizedNode expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- NormalizedNode expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
-
- setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
- setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
-
- doReturn(MemberName.forName(memberName)).when(mockActorContext).getCurrentMemberName();
-
- doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
-
- TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
-
- Optional<NormalizedNode> readOptional = transactionProxy.read(
- YangInstanceIdentifier.of()).get(5, TimeUnit.SECONDS);
-
- assertTrue("NormalizedNode isPresent", readOptional.isPresent());
-
- NormalizedNode normalizedNode = readOptional.orElseThrow();
-
- assertTrue("Expect value to be a Collection", normalizedNode.body() instanceof Collection);
-
- @SuppressWarnings("unchecked")
- Collection<NormalizedNode> collection = (Collection<NormalizedNode>) normalizedNode.body();
-
- for (NormalizedNode node : collection) {
- assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
- }
-
- assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
- NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
-
- assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
-
- assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
- NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
-
- assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
- }
-
-
- private void setUpReadData(final String shardName, final NormalizedNode expectedNode) {
- ActorSystem actorSystem = getSystem();
- ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
-
- doReturn(getSystem().actorSelection(shardActorRef.path())).when(mockActorContext)
- .actorSelection(shardActorRef.path().toString());
-
- doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).when(mockActorContext)
- .findPrimaryShardAsync(eq(shardName));
-
- ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
-
- doReturn(actorSystem.actorSelection(txActorRef.path())).when(mockActorContext)
- .actorSelection(txActorRef.path().toString());
-
- doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION)))
- .when(mockActorContext).executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
- eqCreateTransaction(memberName, TransactionType.READ_ONLY), any(Timeout.class));
-
- doReturn(readDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
- eq(actorSelection(txActorRef)), eqReadData(YangInstanceIdentifier.of()), any(Timeout.class));
- }
-}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.datastore;
import com.codahale.metrics.Timer;
* TransactionRateLimitingCallback computes the new transaction rate limit on the successful completion of a
* transaction.
*/
-public class TransactionRateLimitingCallback implements OperationCallback {
+@Deprecated(since = "9.0.0", forRemoval = true)
+final class TransactionRateLimitingCallback implements OperationCallback {
private static Ticker TICKER = Ticker.systemTicker();
private enum State {
private long elapsedTime;
private volatile State state = State.STOPPED;
- TransactionRateLimitingCallback(ActorUtils actorUtils) {
+ TransactionRateLimitingCallback(final ActorUtils actorUtils) {
commitTimer = actorUtils.getOperationTimer(ActorUtils.COMMIT);
}
}
@VisibleForTesting
- static void setTicker(Ticker ticker) {
+ static void setTicker(final Ticker ticker) {
TICKER = ticker;
}
}
*
* @author Thomas Pantelis
*/
+@Deprecated(since = "9.0.0", forRemoval = true)
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class TransactionRateLimitingCallbackTest {
@Mock