From 3404a29b4ee3b146a5ff98889bcf1afdebb65c3f Mon Sep 17 00:00:00 2001 From: "tadei.bilan" Date: Thu, 2 Jul 2020 13:48:54 +0300 Subject: [PATCH] Specialize TransactionContextWrapper Most of the time we are talking to local leader, in which case we do not need to queue messages and bounce them through the queue. JIRA: CONTROLLER-1952 Change-Id: I07d85c82c2ab6e4251c70b2e6d1dafa2dc455d39 Signed-off-by: tadei.bilan Signed-off-by: Ivan Hrasko Signed-off-by: Robert Varga --- .../AbstractTransactionContextFactory.java | 81 +++++++++----- .../AbstractTransactionContextWrapper.java | 72 ++++++++++++ ... => DelayedTransactionContextWrapper.java} | 103 ++++++++---------- .../DirectTransactionContextWrapper.java | 49 +++++++++ .../RemoteTransactionContextSupport.java | 7 +- .../cluster/datastore/TransactionContext.java | 2 +- .../cluster/datastore/TransactionProxy.java | 22 ++-- ...DelayedTransactionContextWrapperTest.java} | 8 +- .../DirectTransactionContextWrapperTest.java | 47 ++++++++ 9 files changed, 285 insertions(+), 106 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextWrapper.java rename opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/{TransactionContextWrapper.java => DelayedTransactionContextWrapper.java} (84%) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapper.java rename opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/{TransactionContextWrapperTest.java => DelayedTransactionContextWrapperTest.java} (82%) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapperTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java index 6d573dedf5..211ebec63c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore; import static java.util.Objects.requireNonNull; import akka.actor.ActorSelection; -import akka.dispatch.OnComplete; import java.util.Collection; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -80,20 +79,48 @@ abstract class AbstractTransactionContextFactory findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier()); + final DelayedTransactionContextWrapper contextWrapper = new DelayedTransactionContextWrapper( + parent.getIdentifier(), actorUtils, shardName); + final Future findPrimaryFuture = findPrimaryShard(shardName, parent.getIdentifier()); if (findPrimaryFuture.isCompleted()) { - Try maybe = findPrimaryFuture.value().get(); + final Try maybe = findPrimaryFuture.value().get(); if (maybe.isSuccess()) { - onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextWrapper); + return maybeCreateDirectTransactionContextWrapper(maybe.get(), parent, shardName, + contextWrapper); } else { - onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextWrapper); + onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, + contextWrapper); } } else { - findPrimaryFuture.onComplete(new OnComplete() { - @Override - public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) { - if (failure == null) { - onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextWrapper); - } else { - onFindPrimaryShardFailure(failure, parent, shardName, transactionContextWrapper); - } + findPrimaryFuture.onComplete((result) -> { + if (result.isSuccess()) { + onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper); + } else { + onFindPrimaryShardFailure(result.failed().get(), parent, shardName, contextWrapper); } + return null; }, actorUtils.getClientDispatcher()); } - - return transactionContextWrapper; + return contextWrapper; } private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextWrapper.java new file mode 100644 index 0000000000..49dac87cc9 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextWrapper.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorSelection; +import java.util.Optional; +import java.util.SortedSet; +import java.util.concurrent.TimeUnit; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import scala.concurrent.Future; + +/** + * A helper class that wraps an eventual TransactionContext instance. We have two specializations: + *
    + *
  • {@link DelayedTransactionContextWrapper}, which enqueues operations towards the backend
  • + *
  • {@link DirectTransactionContextWrapper}, which sends operations to the backend
  • + *
+ */ +abstract class AbstractTransactionContextWrapper { + private final TransactionIdentifier identifier; + private final OperationLimiter limiter; + private final String shardName; + + AbstractTransactionContextWrapper(@NonNull final TransactionIdentifier identifier, + @NonNull final ActorUtils actorUtils, @NonNull final String shardName) { + this.identifier = requireNonNull(identifier); + this.shardName = requireNonNull(shardName); + limiter = new OperationLimiter(identifier, + // 1 extra permit for the ready operation + actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1, + TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis())); + } + + final TransactionIdentifier getIdentifier() { + return identifier; + } + + final OperationLimiter getLimiter() { + return limiter; + } + + final String getShardName() { + return shardName; + } + + abstract @Nullable TransactionContext getTransactionContext(); + + /** + * Either enqueue or execute specified operation. + * + * @param op Operation to (eventually) execute + */ + abstract void maybeExecuteTransactionOperation(TransactionOperation op); + + /** + * Mark the transaction as ready. + * + * @param participatingShardNames Shards which participate on the transaction + * @return Future indicating the transaction has been readied on the backend + */ + abstract @NonNull Future readyTransaction(Optional> participatingShardNames); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedTransactionContextWrapper.java similarity index 84% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedTransactionContextWrapper.java index 2facfbd1a2..17df235363 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedTransactionContextWrapper.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore; import static com.google.common.base.Preconditions.checkState; -import static java.util.Objects.requireNonNull; import akka.actor.ActorSelection; import akka.dispatch.Futures; @@ -19,8 +18,8 @@ import java.util.List; import java.util.Map.Entry; import java.util.Optional; import java.util.SortedSet; -import java.util.concurrent.TimeUnit; import org.checkerframework.checker.lock.qual.GuardedBy; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.slf4j.Logger; @@ -29,23 +28,20 @@ import scala.concurrent.Future; import scala.concurrent.Promise; /** - * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target - * TransactionContext instance are cached until the TransactionContext instance becomes available at which - * time they are executed. + * Delayed implementation of TransactionContextWrapper. Operations destined for the target + * TransactionContext instance are cached until the TransactionContext instance becomes + * available at which time they are executed. * * @author Thomas Pantelis */ -class TransactionContextWrapper { - private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class); +final class DelayedTransactionContextWrapper extends AbstractTransactionContextWrapper { + private static final Logger LOG = LoggerFactory.getLogger(DelayedTransactionContextWrapper.class); /** * The list of transaction operations to execute once the TransactionContext becomes available. */ @GuardedBy("queuedTxOperations") private final List> queuedTxOperations = new ArrayList<>(); - private final TransactionIdentifier identifier; - private final OperationLimiter limiter; - private final String shardName; /** * The resulting TransactionContext. @@ -56,22 +52,45 @@ class TransactionContextWrapper { @GuardedBy("queuedTxOperations") private boolean pendingEnqueue; - TransactionContextWrapper(final TransactionIdentifier identifier, final ActorUtils actorUtils, - final String shardName) { - this.identifier = requireNonNull(identifier); - this.limiter = new OperationLimiter(identifier, - // 1 extra permit for the ready operation - actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1, - TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis())); - this.shardName = requireNonNull(shardName); + DelayedTransactionContextWrapper(@NonNull final TransactionIdentifier identifier, + @NonNull final ActorUtils actorUtils, @NonNull final String shardName) { + super(identifier, actorUtils, shardName); } + @Override TransactionContext getTransactionContext() { return transactionContext; } - TransactionIdentifier getIdentifier() { - return identifier; + @Override + void maybeExecuteTransactionOperation(final TransactionOperation op) { + final TransactionContext localContext = transactionContext; + if (localContext != null) { + op.invoke(localContext, null); + } else { + // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future + // callback to be executed after the Tx is created. + enqueueTransactionOperation(op); + } + } + + @Override + Future readyTransaction(final Optional> participatingShardNames) { + // avoid the creation of a promise and a TransactionOperation + final TransactionContext localContext = transactionContext; + if (localContext != null) { + return localContext.readyTransaction(null, participatingShardNames); + } + + final Promise promise = Futures.promise(); + enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) { + promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames)); + } + }); + + return promise.future(); } /** @@ -97,7 +116,7 @@ class TransactionContextWrapper { synchronized (queuedTxOperations) { contextOnEntry = transactionContext; if (contextOnEntry == null) { - checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", identifier); + checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", getIdentifier()); pendingEnqueue = true; } } @@ -112,15 +131,15 @@ class TransactionContextWrapper { TransactionContext finishHandoff = null; try { // Acquire the permit, - final boolean havePermit = limiter.acquire(); + final boolean havePermit = getLimiter().acquire(); if (!havePermit) { - LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier, - shardName); + LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", getIdentifier(), + getShardName()); } // Ready to enqueue, take the lock again and append the operation synchronized (queuedTxOperations) { - LOG.debug("Tx {} Queuing TransactionOperation", identifier); + LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier()); queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit)); pendingEnqueue = false; cleanupEnqueue = false; @@ -141,17 +160,6 @@ class TransactionContextWrapper { } } - void maybeExecuteTransactionOperation(final TransactionOperation op) { - final TransactionContext localContext = transactionContext; - if (localContext != null) { - op.invoke(localContext, null); - } else { - // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future - // callback to be executed after the Tx is created. - enqueueTransactionOperation(op); - } - } - void executePriorTransactionOperations(final TransactionContext localTransactionContext) { while (true) { // Access to queuedTxOperations and transactionContext must be protected and atomic @@ -190,32 +198,11 @@ class TransactionContextWrapper { if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) { // If the context is not using limiting we need to release operations as we are queueing them, so // user threads are not charged for them. - limiter.release(); + getLimiter().release(); } oper.getKey().invoke(localTransactionContext, permit); } } } - Future readyTransaction(Optional> participatingShardNames) { - // avoid the creation of a promise and a TransactionOperation - final TransactionContext localContext = transactionContext; - if (localContext != null) { - return localContext.readyTransaction(null, participatingShardNames); - } - - final Promise promise = Futures.promise(); - enqueueTransactionOperation(new TransactionOperation() { - @Override - public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) { - promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames)); - } - }); - - return promise.future(); - } - - OperationLimiter getLimiter() { - return limiter; - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapper.java new file mode 100644 index 0000000000..f004088134 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DirectTransactionContextWrapper.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2021 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorSelection; +import java.util.Optional; +import java.util.SortedSet; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; +import scala.concurrent.Future; + +/** + * Direct implementation of TransactionContextWrapper. Operation are executed directly on TransactionContext. Always + * has completed context and executes on local shard. + */ +final class DirectTransactionContextWrapper extends AbstractTransactionContextWrapper { + private final TransactionContext transactionContext; + + DirectTransactionContextWrapper(@NonNull final TransactionIdentifier identifier, + @NonNull final ActorUtils actorUtils, + @NonNull final String shardName, + @NonNull final TransactionContext transactionContext) { + super(identifier, actorUtils, shardName); + this.transactionContext = requireNonNull(transactionContext); + } + + @Override + TransactionContext getTransactionContext() { + return transactionContext; + } + + @Override + void maybeExecuteTransactionOperation(final TransactionOperation op) { + op.invoke(transactionContext, null); + } + + @Override + Future readyTransaction(final Optional> participatingShardNames) { + return transactionContext.readyTransaction(null, participatingShardNames); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 4276f3be52..333d11b4f0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -35,7 +35,7 @@ import scala.concurrent.duration.FiniteDuration; *

* The end result from a completed CreateTransaction message is a TransactionContext that is * used to perform transaction operations. Transaction operations that occur before the - * CreateTransaction completes are cache via a TransactionContextWrapper and executed once the + * CreateTransaction completes are cached via a DelayedTransactionContextWrapper and executed once the * CreateTransaction completes, successfully or not. */ final class RemoteTransactionContextSupport { @@ -59,9 +59,9 @@ final class RemoteTransactionContextSupport { private final Timeout createTxMessageTimeout; - private final TransactionContextWrapper transactionContextWrapper; + private final DelayedTransactionContextWrapper transactionContextWrapper; - RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextWrapper, + RemoteTransactionContextSupport(final DelayedTransactionContextWrapper transactionContextWrapper, final TransactionProxy parent, final String shardName) { this.parent = requireNonNull(parent); this.shardName = shardName; @@ -231,7 +231,6 @@ final class RemoteTransactionContextSupport { localTransactionContext = new NoOpTransactionContext(exception, getIdentifier()); } - transactionContextWrapper.executePriorTransactionOperations(localTransactionContext); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java index cafc2a82ea..c96233cceb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java @@ -36,7 +36,7 @@ interface TransactionContext { Future directCommit(Boolean havePermit); /** - * Invoked by {@link TransactionContextWrapper} when it has finished handing + * Invoked by {@link AbstractTransactionContextWrapper} when it has finished handing * off operations to this context. From this point on, the context is responsible * for throttling operations. * diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index ce0461ffaf..16a979fa6a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -69,7 +69,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction txContextWrappers = new TreeMap<>(); + private final Map txContextWrappers = new TreeMap<>(); private final AbstractTransactionContextFactory txContextFactory; private final TransactionType type; private TransactionState state = TransactionState.OPEN; @@ -95,7 +95,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction proxyFuture = SettableFuture.create(); - TransactionContextWrapper contextWrapper = getContextWrapper(shardName); + AbstractTransactionContextWrapper contextWrapper = getContextWrapper(shardName); contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(final TransactionContext transactionContext, final Boolean havePermit) { @@ -253,7 +253,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction e = Iterables.getOnlyElement( + final Entry e = Iterables.getOnlyElement( txContextWrappers.entrySet()); ret = createSingleCommitCohort(e.getKey(), e.getValue()); break; @@ -297,7 +297,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createSingleCommitCohort(final String shardName, - final TransactionContextWrapper contextWrapper) { + final AbstractTransactionContextWrapper contextWrapper) { LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName); @@ -338,10 +338,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction cohorts = new ArrayList<>(txContextWrappers.size()); final Optional> shardNames = Optional.of(new TreeSet<>(txContextWrappers.keySet())); - for (Entry e : txContextWrappers.entrySet()) { + for (Entry e : txContextWrappers.entrySet()) { LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); - final TransactionContextWrapper wrapper = e.getValue(); + final AbstractTransactionContextWrapper wrapper = e.getValue(); // The remote tx version is obtained the via TransactionContext which may not be available yet so // we pass a Supplier to dynamically obtain it. Once the ready Future is resolved the @@ -361,17 +361,17 @@ public class TransactionProxy extends AbstractDOMStoreTransaction