From d71b6614d6cdb5a98f086edeb56f5c52f365c61c Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 19 May 2015 22:04:10 +0200 Subject: [PATCH] Move operation limiter down to TransactionContextWrapper The limiter tracks the number of operations invoked on the shard leader, which does not correspond to the number of operations executed on the frontend. The appropriate entity to decide what is throttled how is the TransactionContext, which unfortunately may not exist. We will solve this problem by making TransactionContextWrapper perform pessimistic limiting as long as the context does not exist. Once the context is materialized, the outstanding operation queue is handed off to it and the context becomes the entity managing the limits. This patch has the side-effect that committing a transaction requires number of permits equal to the number of shards it touches. It also ensures that readAll() is properly throttled. Change-Id: If91816d806bbb3895592e1f42b0b8e389443d0f7 Signed-off-by: Robert Varga (cherry picked from commit 9e7a9b3725ad25f9adf85f0ad796b7cf748795a4) --- .../datastore/AbstractTransactionContext.java | 69 ++++++++++++++++--- .../AbstractTransactionContextFactory.java | 10 +-- .../LocalThreePhaseCommitCohort.java | 10 +++ .../datastore/LocalTransactionContext.java | 49 +++++++------ .../datastore/NoOpTransactionContext.java | 26 ++++--- .../datastore/RemoteTransactionContext.java | 15 ++-- .../RemoteTransactionContextSupport.java | 8 +-- .../cluster/datastore/TransactionContext.java | 10 +++ .../datastore/TransactionContextWrapper.java | 39 +++++++++-- .../cluster/datastore/TransactionProxy.java | 32 +-------- .../PreLithiumTransactionContextImpl.java | 5 +- .../LocalTransactionContextTest.java | 30 +++++--- 12 files changed, 188 insertions(+), 115 deletions(-) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java index df478b0630..571899ba14 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java @@ -7,31 +7,78 @@ */ package org.opendaylight.controller.cluster.datastore; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; abstract class AbstractTransactionContext implements TransactionContext { - private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class); - + private final OperationLimiter limiter; private long modificationCount = 0; + private boolean handoffComplete; + + protected AbstractTransactionContext(final OperationLimiter limiter) { + this.limiter = Preconditions.checkNotNull(limiter); + } - private final TransactionIdentifier identifier; + /** + * Get the transaction identifier associated with this context. + * + * @return Transaction identifier. + */ + @Nonnull protected final TransactionIdentifier getIdentifier() { + return limiter.getIdentifier(); + } - protected AbstractTransactionContext(TransactionIdentifier identifier) { - this.identifier = identifier; + /** + * Return the operation limiter associated with this context. + * @return Operation limiter. + */ + @Nonnull protected final OperationLimiter getLimiter() { + return limiter; } - protected final TransactionIdentifier getIdentifier() { - return identifier; + /** + * Indicate whether all operations have been handed off by the {@link TransactionContextWrapper}. + * + * @return True if this context is responsible for throttling. + */ + protected final boolean isOperationHandoffComplete() { + return handoffComplete; } - protected void incrementModificationCount(){ + /** + * Acquire operation from the limiter if the handoff has completed. If + * the handoff is still ongoing, this method does nothing. + */ + protected final void acquireOperation() { + if (handoffComplete) { + limiter.acquire(); + } + } + + /** + * Acquire operation from the limiter if the handoff has NOT completed. If + * the handoff has completed, this method does nothing. + */ + protected final void releaseOperation() { + if (!handoffComplete) { + limiter.release(); + } + } + + protected final void incrementModificationCount() { modificationCount++; } - protected void logModificationCount(){ - LOG.debug("Total modifications on Tx {} = [ {} ]", identifier, modificationCount); + protected final void logModificationCount() { + LOG.debug("Total modifications on Tx {} = [ {} ]", getIdentifier(), modificationCount); + } + + @Override + public final void operationHandoffComplete() { + handoffComplete = true; } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java index c90a3f6f6f..4dff391535 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java @@ -85,11 +85,11 @@ abstract class AbstractTransactionContextFactory findPrimaryFuture = findPrimaryShard(shardName); if(findPrimaryFuture.isCompleted()) { @@ -178,7 +178,7 @@ abstract class AbstractTransactionContextFactory initiateCoordinatedCommit() { final Future messageFuture = initiateCommit(false); final Future ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java index dd7d899e0c..9b0accd455 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java @@ -13,7 +13,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; @@ -28,14 +28,11 @@ import scala.concurrent.Future; * @author Thomas Pantelis */ abstract class LocalTransactionContext extends AbstractTransactionContext { - private final DOMStoreTransaction txDelegate; - private final OperationLimiter limiter; - LocalTransactionContext(TransactionIdentifier identifier, DOMStoreTransaction txDelegate, OperationLimiter limiter) { - super(identifier); + LocalTransactionContext(DOMStoreTransaction txDelegate, OperationLimiter limiter) { + super(limiter); this.txDelegate = Preconditions.checkNotNull(txDelegate); - this.limiter = Preconditions.checkNotNull(limiter); } protected abstract DOMStoreWriteTransaction getWriteDelegate(); @@ -46,36 +43,36 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { public void writeData(YangInstanceIdentifier path, NormalizedNode data) { incrementModificationCount(); getWriteDelegate().write(path, data); - limiter.release(); + releaseOperation(); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { incrementModificationCount(); getWriteDelegate().merge(path, data); - limiter.release(); + releaseOperation(); } @Override public void deleteData(YangInstanceIdentifier path) { incrementModificationCount(); getWriteDelegate().delete(path); - limiter.release(); + releaseOperation(); } @Override public void readData(YangInstanceIdentifier path, final SettableFuture>> proxyFuture) { Futures.addCallback(getReadDelegate().read(path), new FutureCallback>>() { @Override - public void onSuccess(Optional> result) { + public void onSuccess(final Optional> result) { proxyFuture.set(result); - limiter.release(); + releaseOperation(); } @Override - public void onFailure(Throwable t) { + public void onFailure(final Throwable t) { proxyFuture.setException(t); - limiter.release(); + releaseOperation(); } }); } @@ -84,34 +81,41 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { public void dataExists(YangInstanceIdentifier path, final SettableFuture proxyFuture) { Futures.addCallback(getReadDelegate().exists(path), new FutureCallback() { @Override - public void onSuccess(Boolean result) { + public void onSuccess(final Boolean result) { proxyFuture.set(result); - limiter.release(); + releaseOperation(); } @Override - public void onFailure(Throwable t) { + public void onFailure(final Throwable t) { proxyFuture.setException(t); - limiter.release(); + releaseOperation(); } }); } private LocalThreePhaseCommitCohort ready() { logModificationCount(); - LocalThreePhaseCommitCohort ready = (LocalThreePhaseCommitCohort) getWriteDelegate().ready(); - limiter.release(); - return ready; + acquireOperation(); + return (LocalThreePhaseCommitCohort) getWriteDelegate().ready(); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private T completeOperation(final ActorContext actorContext, final T operationFuture) { + operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher()); + return operationFuture; } @Override public Future readyTransaction() { - return ready().initiateCoordinatedCommit(); + final LocalThreePhaseCommitCohort cohort = ready(); + return completeOperation(cohort.getActorContext(), cohort.initiateCoordinatedCommit()); } @Override public Future directCommit() { - return ready().initiateDirectCommit(); + final LocalThreePhaseCommitCohort cohort = ready(); + return completeOperation(cohort.getActorContext(), cohort.initiateDirectCommit()); } @Override @@ -122,5 +126,6 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { @Override public void closeTransaction() { txDelegate.close(); + releaseOperation(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java index ff485cbab1..a142142940 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java @@ -11,7 +11,6 @@ import akka.actor.ActorSelection; import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -24,12 +23,10 @@ final class NoOpTransactionContext extends AbstractTransactionContext { private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class); private final Throwable failure; - private final OperationLimiter operationLimiter; - public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier, OperationLimiter operationLimiter) { - super(identifier); + public NoOpTransactionContext(Throwable failure, OperationLimiter limiter) { + super(limiter); this.failure = failure; - this.operationLimiter = operationLimiter; } @Override @@ -45,41 +42,42 @@ final class NoOpTransactionContext extends AbstractTransactionContext { @Override public Future directCommit() { LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure); - operationLimiter.release(); + releaseOperation(); return akka.dispatch.Futures.failed(failure); } @Override public Future readyTransaction() { LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure); - operationLimiter.release(); + releaseOperation(); return akka.dispatch.Futures.failed(failure); } @Override public void deleteData(YangInstanceIdentifier path) { LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path); - operationLimiter.release(); + releaseOperation(); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path); - operationLimiter.release(); + releaseOperation(); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path); - operationLimiter.release(); + releaseOperation(); } @Override public void readData(final YangInstanceIdentifier path, SettableFuture>> proxyFuture) { LOG.debug("Tx {} readData called path = {}", getIdentifier(), path); - operationLimiter.release(); - Throwable t; - if(failure instanceof NoShardLeaderException) { + releaseOperation(); + + final Throwable t; + if (failure instanceof NoShardLeaderException) { t = new DataStoreUnavailableException(failure.getMessage(), failure); } else { t = failure; @@ -90,7 +88,7 @@ final class NoOpTransactionContext extends AbstractTransactionContext { @Override public void dataExists(YangInstanceIdentifier path, SettableFuture proxyFuture) { LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path); - operationLimiter.release(); + releaseOperation(); proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index 6bf0f7fc9c..7e8a2a00eb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -12,7 +12,6 @@ import akka.actor.ActorSelection; import akka.dispatch.OnComplete; import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; @@ -46,27 +45,24 @@ public class RemoteTransactionContext extends AbstractTransactionContext { private final boolean isTxActorLocal; private final short remoteTransactionVersion; - private final OperationLimiter operationCompleter; private BatchedModifications batchedModifications; private int totalBatchedModificationsSent; - protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier, + protected RemoteTransactionContext(ActorSelection actor, ActorContext actorContext, boolean isTxActorLocal, short remoteTransactionVersion, OperationLimiter limiter) { - super(identifier); + super(limiter); this.actor = actor; this.actorContext = actorContext; this.isTxActorLocal = isTxActorLocal; this.remoteTransactionVersion = remoteTransactionVersion; - this.operationCompleter = limiter; } private Future completeOperation(Future operationFuture){ - operationFuture.onComplete(this.operationCompleter, actorContext.getClientDispatcher()); + operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher()); return operationFuture; } - private ActorSelection getActor() { return actor; } @@ -178,6 +174,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { public void deleteData(YangInstanceIdentifier path) { LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path); + acquireOperation(); batchModification(new DeleteModification(path)); } @@ -185,6 +182,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path); + acquireOperation(); batchModification(new MergeModification(path, data)); } @@ -192,6 +190,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { public void writeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path); + acquireOperation(); batchModification(new WriteModification(path, data)); } @@ -204,6 +203,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. + acquireOperation(); sendBatchedModifications(); OnComplete onComplete = new OnComplete() { @@ -246,6 +246,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. + acquireOperation(); sendBatchedModifications(); OnComplete onComplete = new OnComplete() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index 9cb062dc1c..984d650a32 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -161,7 +161,7 @@ final class RemoteTransactionContextSupport { if(failure != null) { LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure); - localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter()); + localTransactionContext = new NoOpTransactionContext(failure, getOperationLimiter()); } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) { localTransactionContext = createValidTransactionContext( CreateTransactionReply.fromSerializable(response)); @@ -169,7 +169,7 @@ final class RemoteTransactionContextSupport { IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); - localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter()); + localTransactionContext = new NoOpTransactionContext(exception, getOperationLimiter()); } transactionContextAdapter.executePriorTransactionOperations(localTransactionContext); @@ -190,10 +190,10 @@ final class RemoteTransactionContextSupport { final TransactionContext ret; if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { - ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(), + ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter()); } else { - ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(), + ret = new RemoteTransactionContext(transactionActor, getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java index 4eea785964..e5130ed6dc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java @@ -36,4 +36,14 @@ interface TransactionContext { boolean supportsDirectCommit(); Future directCommit(); + + /** + * Invoked by {@link TransactionContextWrapper} when it has finished handing + * off operations to this context. From this point on, the context is responsible + * for throttling operations. + * + * Implementations can rely on the wrapper calling this operation in a synchronized + * block, so they do not need to ensure visibility of this state transition themselves. + */ + void operationHandoffComplete(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index 26d7ff8b02..b08d4192b4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -7,6 +7,9 @@ */ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorSelection; +import akka.dispatch.Futures; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Collection; @@ -15,6 +18,8 @@ import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.Promise; /** * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target @@ -37,10 +42,10 @@ class TransactionContextWrapper { */ private volatile TransactionContext transactionContext; - private final TransactionIdentifier identifier; + private final OperationLimiter limiter; - TransactionContextWrapper(final TransactionIdentifier identifier) { - this.identifier = identifier; + TransactionContextWrapper(final OperationLimiter limiter) { + this.limiter = Preconditions.checkNotNull(limiter); } TransactionContext getTransactionContext() { @@ -48,7 +53,7 @@ class TransactionContextWrapper { } TransactionIdentifier getIdentifier() { - return identifier; + return limiter.getIdentifier(); } /** @@ -69,6 +74,8 @@ class TransactionContextWrapper { if (invokeOperation) { operation.invoke(transactionContext); + } else { + limiter.acquire(); } } @@ -95,10 +102,11 @@ class TransactionContextWrapper { // queued (eg a put operation from a client read Future callback that is notified // synchronously). Collection operationsBatch = null; - synchronized(queuedTxOperations) { - if(queuedTxOperations.isEmpty()) { + synchronized (queuedTxOperations) { + if (queuedTxOperations.isEmpty()) { // We're done invoking the TransactionOperations so we can now publish the // TransactionContext. + localTransactionContext.operationHandoffComplete(); transactionContext = localTransactionContext; break; } @@ -110,9 +118,26 @@ class TransactionContextWrapper { // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. // A slight down-side is that we need to re-acquire the lock below but this should // be negligible. - for(TransactionOperation oper: operationsBatch) { + for (TransactionOperation oper : operationsBatch) { oper.invoke(localTransactionContext); } } } + + Future readyTransaction() { + // avoid the creation of a promise and a TransactionOperation + if (transactionContext != null) { + return transactionContext.readyTransaction(); + } + + final Promise promise = Futures.promise(); + enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(TransactionContext transactionContext) { + promise.completeWith(transactionContext.readyTransaction()); + } + }); + + return promise.future(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 5aafcfc88f..1bda7810ed 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -78,8 +78,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction proxyFuture = SettableFuture.create(); TransactionContextWrapper contextAdapter = getContextAdapter(path); contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { @@ -101,8 +99,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createSingleCommitCohort(final String shardName, final TransactionContextWrapper contextAdapter) { - limiter.acquire(); LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName); @@ -308,30 +297,11 @@ public class TransactionProxy extends AbstractDOMStoreTransaction createMultiCommitCohort( final Set> txContextAdapterEntries) { - limiter.acquire(); final List> cohortFutures = new ArrayList<>(txContextAdapterEntries.size()); for (Entry e : txContextAdapterEntries) { LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey()); - TransactionContextWrapper contextAdapter = e.getValue(); - final TransactionContext transactionContext = contextAdapter.getTransactionContext(); - Future future; - if (transactionContext != null) { - // avoid the creation of a promise and a TransactionOperation - future = transactionContext.readyTransaction(); - } else { - final Promise promise = akka.dispatch.Futures.promise(); - contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(transactionContext.readyTransaction()); - } - }); - - future = promise.future(); - } - - cohortFutures.add(future); + cohortFutures.add(e.getValue().readyTransaction()); } return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java index 4de8ab721f..733543aabd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java @@ -11,7 +11,6 @@ import akka.actor.ActorSelection; import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.OperationLimiter; import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; @@ -35,10 +34,10 @@ public class PreLithiumTransactionContextImpl extends RemoteTransactionContext { private final String transactionPath; - public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier, + public PreLithiumTransactionContextImpl(String transactionPath, ActorSelection actor, ActorContext actorContext, boolean isTxActorLocal, short remoteTransactionVersion, OperationLimiter limiter) { - super(actor, identifier, actorContext, isTxActorLocal, remoteTransactionVersion, limiter); + super(actor, actorContext, isTxActorLocal, remoteTransactionVersion, limiter); this.transactionPath = transactionPath; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java index d8f74dd832..b04612ec85 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java @@ -3,16 +3,19 @@ package org.opendaylight.controller.cluster.datastore; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import akka.dispatch.ExecutionContexts; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -31,9 +34,9 @@ public class LocalTransactionContextTest { LocalTransactionContext localTransactionContext; @Before - public void setUp(){ + public void setUp() { MockitoAnnotations.initMocks(this); - localTransactionContext = new LocalTransactionContext(identifier, readWriteTransaction, limiter) { + localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter) { @Override protected DOMStoreWriteTransaction getWriteDelegate() { return readWriteTransaction; @@ -47,7 +50,7 @@ public class LocalTransactionContextTest { } @Test - public void testWrite(){ + public void testWrite() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); NormalizedNode normalizedNode = mock(NormalizedNode.class); localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode); @@ -56,7 +59,7 @@ public class LocalTransactionContextTest { } @Test - public void testMerge(){ + public void testMerge() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); NormalizedNode normalizedNode = mock(NormalizedNode.class); localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode); @@ -65,7 +68,7 @@ public class LocalTransactionContextTest { } @Test - public void testDelete(){ + public void testDelete() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); localTransactionContext.deleteData(yangInstanceIdentifier); verify(limiter).release(); @@ -74,7 +77,7 @@ public class LocalTransactionContextTest { @Test - public void testRead(){ + public void testRead() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); NormalizedNode normalizedNode = mock(NormalizedNode.class); doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier); @@ -84,7 +87,7 @@ public class LocalTransactionContextTest { } @Test - public void testExists(){ + public void testExists() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier); localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture. create()); @@ -93,10 +96,15 @@ public class LocalTransactionContextTest { } @Test - public void testReady(){ - doReturn(mock(LocalThreePhaseCommitCohort.class)).when(readWriteTransaction).ready(); + public void testReady() { + final LocalThreePhaseCommitCohort mockCohort = mock(LocalThreePhaseCommitCohort.class); + final ActorContext mockContext = mock(ActorContext.class); + doReturn(mockContext).when(mockCohort).getActorContext(); + doReturn(ExecutionContexts.fromExecutor(MoreExecutors.directExecutor())).when(mockContext).getClientDispatcher(); + doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(); + doReturn(mockCohort).when(readWriteTransaction).ready(); localTransactionContext.readyTransaction(); - verify(limiter).release(); + verify(limiter).onComplete(null, null); verify(readWriteTransaction).ready(); } -- 2.36.6