From 21ccb7510c28e824d6441d48604aec7467d44710 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Tue, 9 Jun 2015 18:37:35 -0700 Subject: [PATCH] BUG 3019 : Fix Operation throttling for modification batching scenarios This patch straightens out where exactly limiting is done. A TransactionProxy creates a TransactionContext for every shard on which a transaction needs to be done. There are 3 types of TransactionContexts. NoOpTransactionContext, LocalTransactionContext and RemoteTransactionContext. When a operation is done on TransactionProxy it does not know which of these TransactionContexts it should create so it first createas a TransactionContextWrapper. All operations on TransactionProxy are then queued up in the TransactionContextWrapper till we determine which TransactionContext to create. This patch creates an OperationLimiter per TransactionContextWrapper. Everytime an operation is enqueued we acquire a permit. When the TransactionContext is finally created we do different things depending on the TransactionContext. For NoOp and Local TransactionContexts we completely ignore the limiter - that is for these TransactionContexts there is no limiting done. For RemoteTransactionContext we do limiting. RemoteTransactionContext does not acquire Operation permits till it is made visible by the TransactionContextWrapper - this is signaled be the setting of the handOffComplete flag in AbstractTransactionContext. After that RemoteTransactionContext takes over the business of acquiring permits. OperationLimiter which also serves as the Operation completion handler is the only component that releases the permits. Another thing which this patch addresses is which configuration option we use for operation limiting. We now use ShardBatchedModificationCount instead of the mailbox limit from akka.conf. This removes the possibility of mis-configuration where making ShardedBatchedModificationCount higher than mailbox limit could cause unexpected blocking. Change-Id: I571ba5278630e5166be6bcb3ff8e1c527c5e3343 Signed-off-by: Moiz Raja --- .../src/main/resources/initial/datastore.cfg | 2 +- .../datastore/AbstractTransactionContext.java | 61 +++++-------------- .../AbstractTransactionContextFactory.java | 15 +++-- .../cluster/datastore/DatastoreContext.java | 2 +- .../datastore/LocalTransactionContext.java | 25 ++------ .../datastore/NoOpTransactionContext.java | 12 +--- .../cluster/datastore/OperationLimiter.java | 19 +++--- .../datastore/RemoteTransactionContext.java | 25 +++++++- .../RemoteTransactionContextSupport.java | 14 ++--- .../cluster/datastore/TransactionContext.java | 8 ++- .../datastore/TransactionContextWrapper.java | 23 +++++-- .../cluster/datastore/TransactionProxy.java | 10 --- .../PreLithiumTransactionContextImpl.java | 5 +- .../cluster/datastore/utils/ActorContext.java | 15 ----- .../yang/distributed-datastore-provider.yang | 2 +- .../AbstractTransactionProxyTest.java | 18 ++++-- .../LocalTransactionContextTest.java | 8 +-- .../datastore/OperationLimiterTest.java | 13 ++-- .../TransactionContextWrapperTest.java | 44 +++++++++++++ .../datastore/TransactionProxyTest.java | 41 ++++++------- 20 files changed, 188 insertions(+), 174 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapperTest.java diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg index cfbf9450aa..e27376290e 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/datastore.cfg @@ -51,7 +51,7 @@ operational.persistent=false # The number of transaction modification operations (put, merge, delete) to batch before sending to the # shard transaction actor. Batching improves performance as less modifications messages are sent to the # actor and thus lessens the chance that the transaction actor's mailbox queue could get full. -#shard-batched-modification-count=100 +#shard-batched-modification-count=1000 # The maximum amount of time for akka operations (remote or local) to complete before failing. #operation-timeout-in-seconds=5 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java index 571899ba14..97a0205ff2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java @@ -7,7 +7,6 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Preconditions; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.slf4j.Logger; @@ -15,12 +14,12 @@ import org.slf4j.LoggerFactory; abstract class AbstractTransactionContext implements TransactionContext { private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class); - private final OperationLimiter limiter; + private final TransactionIdentifier transactionIdentifier; private long modificationCount = 0; - private boolean handoffComplete; + private boolean handOffComplete; - protected AbstractTransactionContext(final OperationLimiter limiter) { - this.limiter = Preconditions.checkNotNull(limiter); + protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier) { + this.transactionIdentifier = transactionIdentifier; } /** @@ -29,44 +28,7 @@ abstract class AbstractTransactionContext implements TransactionContext { * @return Transaction identifier. */ @Nonnull protected final TransactionIdentifier getIdentifier() { - return limiter.getIdentifier(); - } - - /** - * Return the operation limiter associated with this context. - * @return Operation limiter. - */ - @Nonnull protected final OperationLimiter getLimiter() { - return limiter; - } - - /** - * Indicate whether all operations have been handed off by the {@link TransactionContextWrapper}. - * - * @return True if this context is responsible for throttling. - */ - protected final boolean isOperationHandoffComplete() { - return handoffComplete; - } - - /** - * Acquire operation from the limiter if the handoff has completed. If - * the handoff is still ongoing, this method does nothing. - */ - protected final void acquireOperation() { - if (handoffComplete) { - limiter.acquire(); - } - } - - /** - * Acquire operation from the limiter if the handoff has NOT completed. If - * the handoff has completed, this method does nothing. - */ - protected final void releaseOperation() { - if (!handoffComplete) { - limiter.release(); - } + return transactionIdentifier; } protected final void incrementModificationCount() { @@ -78,7 +40,16 @@ abstract class AbstractTransactionContext implements TransactionContext { } @Override - public final void operationHandoffComplete() { - handoffComplete = true; + public final void operationHandOffComplete() { + handOffComplete = true; + } + + protected boolean isOperationHandOffComplete(){ + return handOffComplete; + } + + @Override + public boolean usesOperationLimiting() { + return false; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java index 976e613e8e..19646f27fc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java @@ -85,11 +85,12 @@ abstract class AbstractTransactionContextFactory findPrimaryFuture = findPrimaryShard(shardName); if(findPrimaryFuture.isCompleted()) { @@ -174,11 +175,13 @@ abstract class AbstractTransactionContextFactory void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection> cohortFutures); - private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) { + private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, + final TransactionProxy parent) { + switch(parent.getType()) { case READ_ONLY: final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier()); - return new LocalTransactionContext(readOnly, parent.getLimiter()) { + return new LocalTransactionContext(readOnly, parent.getIdentifier()) { @Override protected DOMStoreWriteTransaction getWriteDelegate() { throw new UnsupportedOperationException(); @@ -191,7 +194,7 @@ abstract class AbstractTransactionContextFactory globalDatastoreTypes = Sets.newConcurrentHashSet(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java index 9b0accd455..e62d15b7ff 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java @@ -13,7 +13,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; @@ -30,8 +30,8 @@ import scala.concurrent.Future; abstract class LocalTransactionContext extends AbstractTransactionContext { private final DOMStoreTransaction txDelegate; - LocalTransactionContext(DOMStoreTransaction txDelegate, OperationLimiter limiter) { - super(limiter); + LocalTransactionContext(DOMStoreTransaction txDelegate, TransactionIdentifier identifier) { + super(identifier); this.txDelegate = Preconditions.checkNotNull(txDelegate); } @@ -43,21 +43,18 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { public void writeData(YangInstanceIdentifier path, NormalizedNode data) { incrementModificationCount(); getWriteDelegate().write(path, data); - releaseOperation(); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { incrementModificationCount(); getWriteDelegate().merge(path, data); - releaseOperation(); } @Override public void deleteData(YangInstanceIdentifier path) { incrementModificationCount(); getWriteDelegate().delete(path); - releaseOperation(); } @Override @@ -66,13 +63,11 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { @Override public void onSuccess(final Optional> result) { proxyFuture.set(result); - releaseOperation(); } @Override public void onFailure(final Throwable t) { proxyFuture.setException(t); - releaseOperation(); } }); } @@ -83,39 +78,30 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { @Override public void onSuccess(final Boolean result) { proxyFuture.set(result); - releaseOperation(); } @Override public void onFailure(final Throwable t) { proxyFuture.setException(t); - releaseOperation(); } }); } private LocalThreePhaseCommitCohort ready() { logModificationCount(); - acquireOperation(); return (LocalThreePhaseCommitCohort) getWriteDelegate().ready(); } - @SuppressWarnings({ "unchecked", "rawtypes" }) - private T completeOperation(final ActorContext actorContext, final T operationFuture) { - operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher()); - return operationFuture; - } - @Override public Future readyTransaction() { final LocalThreePhaseCommitCohort cohort = ready(); - return completeOperation(cohort.getActorContext(), cohort.initiateCoordinatedCommit()); + return cohort.initiateCoordinatedCommit(); } @Override public Future directCommit() { final LocalThreePhaseCommitCohort cohort = ready(); - return completeOperation(cohort.getActorContext(), cohort.initiateDirectCommit()); + return cohort.initiateDirectCommit(); } @Override @@ -126,6 +112,5 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { @Override public void closeTransaction() { txDelegate.close(); - releaseOperation(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java index a142142940..2094cd2f77 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java @@ -11,6 +11,7 @@ import akka.actor.ActorSelection; import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -24,8 +25,8 @@ final class NoOpTransactionContext extends AbstractTransactionContext { private final Throwable failure; - public NoOpTransactionContext(Throwable failure, OperationLimiter limiter) { - super(limiter); + public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier) { + super(identifier); this.failure = failure; } @@ -42,39 +43,33 @@ final class NoOpTransactionContext extends AbstractTransactionContext { @Override public Future directCommit() { LOG.debug("Tx {} directCommit called, failure: {}", getIdentifier(), failure); - releaseOperation(); return akka.dispatch.Futures.failed(failure); } @Override public Future readyTransaction() { LOG.debug("Tx {} readyTransaction called, failure: {}", getIdentifier(), failure); - releaseOperation(); return akka.dispatch.Futures.failed(failure); } @Override public void deleteData(YangInstanceIdentifier path) { LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path); - releaseOperation(); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path); - releaseOperation(); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path); - releaseOperation(); } @Override public void readData(final YangInstanceIdentifier path, SettableFuture>> proxyFuture) { LOG.debug("Tx {} readData called path = {}", getIdentifier(), path); - releaseOperation(); final Throwable t; if (failure instanceof NoShardLeaderException) { @@ -88,7 +83,6 @@ final class NoOpTransactionContext extends AbstractTransactionContext { @Override public void dataExists(YangInstanceIdentifier path, SettableFuture proxyFuture) { LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path); - releaseOperation(); proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java index b42230971b..34a7ebf8f2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/OperationLimiter.java @@ -26,6 +26,7 @@ public class OperationLimiter extends OnComplete { private final TransactionIdentifier identifier; private final long acquireTimeout; private final Semaphore semaphore; + private final int maxPermits; OperationLimiter(final TransactionIdentifier identifier, final int maxPermits, final int acquireTimeoutSeconds) { this.identifier = Preconditions.checkNotNull(identifier); @@ -34,6 +35,7 @@ public class OperationLimiter extends OnComplete { this.acquireTimeout = TimeUnit.SECONDS.toNanos(acquireTimeoutSeconds); Preconditions.checkArgument(maxPermits >= 0); + this.maxPermits = maxPermits; this.semaphore = new Semaphore(maxPermits); } @@ -41,7 +43,7 @@ public class OperationLimiter extends OnComplete { acquire(1); } - private void acquire(final int acquirePermits) { + void acquire(final int acquirePermits) { try { if (!semaphore.tryAcquire(acquirePermits, acquireTimeout, TimeUnit.NANOSECONDS)) { LOG.warn("Failed to acquire operation permit for transaction {}", identifier); @@ -55,10 +57,6 @@ public class OperationLimiter extends OnComplete { } } - void release() { - this.semaphore.release(); - } - @Override public void onComplete(final Throwable throwable, final Object message) { if (message instanceof BatchedModificationsReply) { @@ -73,7 +71,14 @@ public class OperationLimiter extends OnComplete { } @VisibleForTesting - Semaphore getSemaphore() { - return semaphore; + int availablePermits(){ + return semaphore.availablePermits(); + } + + /** + * Release all the permits + */ + public void releaseAll() { + this.semaphore.release(maxPermits-availablePermits()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index 7e8a2a00eb..20074c1028 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -11,7 +11,9 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SettableFuture; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; @@ -44,14 +46,16 @@ public class RemoteTransactionContext extends AbstractTransactionContext { private final ActorSelection actor; private final boolean isTxActorLocal; private final short remoteTransactionVersion; + private final OperationLimiter limiter; private BatchedModifications batchedModifications; private int totalBatchedModificationsSent; - protected RemoteTransactionContext(ActorSelection actor, + protected RemoteTransactionContext(TransactionIdentifier identifier, ActorSelection actor, ActorContext actorContext, boolean isTxActorLocal, short remoteTransactionVersion, OperationLimiter limiter) { - super(limiter); + super(identifier); + this.limiter = Preconditions.checkNotNull(limiter); this.actor = actor; this.actorContext = actorContext; this.isTxActorLocal = isTxActorLocal; @@ -59,7 +63,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } private Future completeOperation(Future operationFuture){ - operationFuture.onComplete(getLimiter(), actorContext.getClientDispatcher()); + operationFuture.onComplete(limiter, actorContext.getClientDispatcher()); return operationFuture; } @@ -277,4 +281,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext { future.onComplete(onComplete, actorContext.getClientDispatcher()); } + + /** + * Acquire operation from the limiter if the hand-off has completed. If + * the hand-off is still ongoing, this method does nothing. + */ + private final void acquireOperation() { + if (isOperationHandOffComplete()) { + limiter.acquire(); + } + } + + @Override + public boolean usesOperationLimiting() { + return true; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java index afd748fd48..176073ef70 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java @@ -75,7 +75,7 @@ final class RemoteTransactionContextSupport { } private OperationLimiter getOperationLimiter() { - return parent.getLimiter(); + return transactionContextAdapter.getLimiter(); } private TransactionIdentifier getIdentifier() { @@ -160,7 +160,7 @@ final class RemoteTransactionContextSupport { if(failure != null) { LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure); - localTransactionContext = new NoOpTransactionContext(failure, getOperationLimiter()); + localTransactionContext = new NoOpTransactionContext(failure, getIdentifier()); } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) { localTransactionContext = createValidTransactionContext( CreateTransactionReply.fromSerializable(response)); @@ -168,7 +168,7 @@ final class RemoteTransactionContextSupport { IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); - localTransactionContext = new NoOpTransactionContext(exception, getOperationLimiter()); + localTransactionContext = new NoOpTransactionContext(exception, getIdentifier()); } transactionContextAdapter.executePriorTransactionOperations(localTransactionContext); @@ -189,11 +189,11 @@ final class RemoteTransactionContextSupport { final TransactionContext ret; if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) { - ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, - getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getLimiter()); + ret = new PreLithiumTransactionContextImpl(transactionContextAdapter.getIdentifier(), transactionPath, transactionActor, + getActorContext(), isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter()); } else { - ret = new RemoteTransactionContext(transactionActor, getActorContext(), - isTxActorLocal, remoteTransactionVersion, parent.getLimiter()); + ret = new RemoteTransactionContext(transactionContextAdapter.getIdentifier(), transactionActor, getActorContext(), + isTxActorLocal, remoteTransactionVersion, transactionContextAdapter.getLimiter()); } if(parent.getType() == TransactionType.READ_ONLY) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java index e5130ed6dc..6a542002d0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java @@ -45,5 +45,11 @@ interface TransactionContext { * Implementations can rely on the wrapper calling this operation in a synchronized * block, so they do not need to ensure visibility of this state transition themselves. */ - void operationHandoffComplete(); + void operationHandOffComplete(); + + /** + * A TransactionContext that uses Operation limiting should return true else false + * @return + */ + boolean usesOperationLimiting(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index b08d4192b4..89a6a97fd6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -16,6 +16,7 @@ import java.util.Collection; import java.util.List; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -37,6 +38,8 @@ class TransactionContextWrapper { @GuardedBy("queuedTxOperations") private final List queuedTxOperations = Lists.newArrayList(); + private final TransactionIdentifier identifier; + /** * The resulting TransactionContext. */ @@ -44,8 +47,11 @@ class TransactionContextWrapper { private final OperationLimiter limiter; - TransactionContextWrapper(final OperationLimiter limiter) { - this.limiter = Preconditions.checkNotNull(limiter); + TransactionContextWrapper(TransactionIdentifier identifier, final ActorContext actorContext) { + this.identifier = Preconditions.checkNotNull(identifier); + this.limiter = new OperationLimiter(identifier, + actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation + actorContext.getDatastoreContext().getOperationTimeoutInSeconds()); } TransactionContext getTransactionContext() { @@ -53,7 +59,7 @@ class TransactionContextWrapper { } TransactionIdentifier getIdentifier() { - return limiter.getIdentifier(); + return identifier; } /** @@ -106,7 +112,10 @@ class TransactionContextWrapper { if (queuedTxOperations.isEmpty()) { // We're done invoking the TransactionOperations so we can now publish the // TransactionContext. - localTransactionContext.operationHandoffComplete(); + localTransactionContext.operationHandOffComplete(); + if(!localTransactionContext.usesOperationLimiting()){ + limiter.releaseAll(); + } transactionContext = localTransactionContext; break; } @@ -140,4 +149,10 @@ class TransactionContextWrapper { return promise.future(); } + + public OperationLimiter getLimiter() { + return limiter; + } + + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 1bda7810ed..f7cb27b07f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -53,7 +53,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction txContextAdapters = new HashMap<>(); private final AbstractTransactionContextFactory txContextFactory; - private final OperationLimiter limiter; private final TransactionType type; private TransactionState state = TransactionState.OPEN; @@ -64,11 +63,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction getModuleNameFromNameSpace(String nameSpace) { - return TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace) ? - Optional.of("junk") : Optional.absent(); + if(TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) { + return Optional.of("junk"); + } else if(CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)){ + return Optional.of("cars"); + } + return Optional.absent(); } }; @@ -151,7 +162,6 @@ public abstract class AbstractTransactionProxyTest { doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper(); doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext(); - doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); mockComponentFactory = TransactionContextFactory.create(mockActorContext); @@ -342,8 +352,6 @@ public abstract class AbstractTransactionProxyTest { doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); - doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); - return actorRef; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java index b04612ec85..0545bbae36 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java @@ -36,7 +36,7 @@ public class LocalTransactionContextTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter) { + localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier()) { @Override protected DOMStoreWriteTransaction getWriteDelegate() { return readWriteTransaction; @@ -54,7 +54,6 @@ public class LocalTransactionContextTest { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); NormalizedNode normalizedNode = mock(NormalizedNode.class); localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode); - verify(limiter).release(); verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode); } @@ -63,7 +62,6 @@ public class LocalTransactionContextTest { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); NormalizedNode normalizedNode = mock(NormalizedNode.class); localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode); - verify(limiter).release(); verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode); } @@ -71,7 +69,6 @@ public class LocalTransactionContextTest { public void testDelete() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); localTransactionContext.deleteData(yangInstanceIdentifier); - verify(limiter).release(); verify(readWriteTransaction).delete(yangInstanceIdentifier); } @@ -82,7 +79,6 @@ public class LocalTransactionContextTest { NormalizedNode normalizedNode = mock(NormalizedNode.class); doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier); localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.>>create()); - verify(limiter).release(); verify(readWriteTransaction).read(yangInstanceIdentifier); } @@ -91,7 +87,6 @@ public class LocalTransactionContextTest { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier); localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture. create()); - verify(limiter).release(); verify(readWriteTransaction).exists(yangInstanceIdentifier); } @@ -104,7 +99,6 @@ public class LocalTransactionContextTest { doReturn(akka.dispatch.Futures.successful(null)).when(mockCohort).initiateCoordinatedCommit(); doReturn(mockCohort).when(readWriteTransaction).ready(); localTransactionContext.readyTransaction(); - verify(limiter).onComplete(null, null); verify(readWriteTransaction).ready(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java index 40ce84b234..7d49090f68 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/OperationLimiterTest.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; -import java.util.concurrent.Semaphore; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; @@ -25,21 +24,21 @@ public class OperationLimiterTest { public void testOnComplete() throws Exception { int permits = 10; OperationLimiter limiter = new OperationLimiter(new TransactionIdentifier("foo", 1), permits, 1); - Semaphore semaphore = limiter.getSemaphore(); - semaphore.acquire(permits); + limiter.acquire(permits); int availablePermits = 0; limiter.onComplete(null, DataExistsReply.create(true)); - assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits()); + assertEquals("availablePermits", ++availablePermits, limiter.availablePermits()); limiter.onComplete(null, DataExistsReply.create(true)); - assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits()); + assertEquals("availablePermits", ++availablePermits, limiter.availablePermits()); limiter.onComplete(null, new IllegalArgumentException()); - assertEquals("availablePermits", ++availablePermits, semaphore.availablePermits()); + assertEquals("availablePermits", ++availablePermits, limiter.availablePermits()); limiter.onComplete(null, new BatchedModificationsReply(4)); availablePermits += 4; - assertEquals("availablePermits", availablePermits, semaphore.availablePermits()); + assertEquals("availablePermits", availablePermits, limiter.availablePermits()); } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapperTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapperTest.java new file mode 100644 index 0000000000..bc98f9ac3f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapperTest.java @@ -0,0 +1,44 @@ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; + +public class TransactionContextWrapperTest { + + @Mock + TransactionIdentifier identifier; + + @Mock + ActorContext actorContext; + + @Mock + TransactionContext transactionContext; + + TransactionContextWrapper transactionContextWrapper; + + @Before + public void setUp(){ + MockitoAnnotations.initMocks(this); + doReturn(DatastoreContext.newBuilder().build()).when(actorContext).getDatastoreContext(); + transactionContextWrapper = new TransactionContextWrapper(identifier, actorContext); + } + + @Test + public void testExecutePriorTransactionOperations(){ + for(int i=0;i<100;i++) { + transactionContextWrapper.maybeExecuteTransactionOperation(mock(TransactionOperation.class)); + } + assertEquals(901, transactionContextWrapper.getLimiter().availablePermits()); + + transactionContextWrapper.executePriorTransactionOperations(transactionContext); + + assertEquals(1001, transactionContextWrapper.getLimiter().availablePermits()); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 26d51cbae3..4301a72d18 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -798,6 +798,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { throttleOperation(operation, 1, true); } + private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){ + throttleOperation(operation, outstandingOpsLimit, shardFound, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds())); + } + private PrimaryShardInfo newPrimaryShardInfo(ActorRef actorRef){ return new PrimaryShardInfo(getSystem().actorSelection(actorRef.path()), DataStoreVersions.CURRENT_VERSION, Optional.absent()); @@ -809,11 +813,14 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { } - private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound){ + private void throttleOperation(TransactionProxyOperation operation, int outstandingOpsLimit, boolean shardFound, long expectedCompletionTime){ ActorSystem actorSystem = getSystem(); ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - doReturn(outstandingOpsLimit).when(mockActorContext).getTransactionOutstandingOperationLimit(); + // Note that we setting batchedModificationCount to one less than what we need because in TransactionProxy + // we now allow one extra permit to be allowed for ready + doReturn(dataStoreContextBuilder.operationTimeoutInSeconds(2). + shardBatchedModificationCount(outstandingOpsLimit-1).build()).when(mockActorContext).getDatastoreContext(); doReturn(actorSystem.actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); @@ -821,6 +828,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { if(shardFound) { doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); + doReturn(Futures.successful(newPrimaryShardInfo(shardActorRef))). + when(mockActorContext).findPrimaryShardAsync(eq("cars")); + } else { doReturn(Futures.failed(new Exception("not found"))) .when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); @@ -845,9 +855,9 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { long end = System.nanoTime(); - long expected = TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()); Assert.assertTrue(String.format("Expected elapsed time: %s. Actual: %s", - expected, (end-start)), (end - start) > expected); + expectedCompletionTime, (end-start)), + ((end - start) > expectedCompletionTime) && ((end - start) < expectedCompletionTime*2)); } @@ -859,8 +869,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorSystem actorSystem = getSystem(); ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit(); - doReturn(actorSystem.actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); @@ -903,8 +911,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { ActorSystem actorSystem = getSystem(); ActorRef shardActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); - doReturn(1).when(mockActorContext).getTransactionOutstandingOperationLimit(); - doReturn(actorSystem.actorSelection(shardActorRef.path())). when(mockActorContext).actorSelection(shardActorRef.path().toString()); @@ -952,7 +958,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteCompletionForLocalShard(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperationLocal(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -968,7 +973,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteThrottlingWhenShardFound(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -986,7 +990,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteThrottlingWhenShardNotFound(){ // Confirm that there is no throttling when the Shard is not found - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1005,7 +1008,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testWriteCompletion(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1022,7 +1024,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeThrottlingWhenShardFound(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1039,7 +1040,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeThrottlingWhenShardNotFound(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1056,7 +1056,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeCompletion(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1074,7 +1073,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testMergeCompletionForLocalShard(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperationLocal(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1122,7 +1120,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testDeleteCompletionForLocalShard(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperationLocal(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1137,7 +1134,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testDeleteCompletion(){ - dataStoreContextBuilder.shardBatchedModificationCount(1); completeOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1326,7 +1322,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { @Test public void testReadyThrottlingWithTwoTransactionContexts(){ - throttleOperation(new TransactionProxyOperation() { @Override public void run(TransactionProxy transactionProxy) { @@ -1340,11 +1335,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); - transactionProxy.write(TestModel.TEST_PATH, carsNode); + // Trying to write to Cars will cause another transaction context to get created + transactionProxy.write(CarsModel.BASE_PATH, carsNode); + // Now ready should block for both transaction contexts transactionProxy.ready(); } - }, 2, true); + }, 1, true, TimeUnit.SECONDS.toNanos(mockActorContext.getDatastoreContext().getOperationTimeoutInSeconds()) * 2); } private void testModificationOperationBatching(TransactionType type) throws Exception { @@ -1526,8 +1523,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest { doReturn(memberName).when(mockActorContext).getCurrentMemberName(); - doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); - doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher(); TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY); -- 2.36.6