X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextWrapper.java;h=2facfbd1a20a29a7d65719c4fba31ee0c4875977;hb=55a9b9f42a14c56060f74b38f84d444c0fbfecc4;hp=137f6529c710f37d5e8c57beaa7cd8892336307d;hpb=daaef05cbf70e6cbec9af181258faead6d9620a6;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index 137f6529c7..2facfbd1a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -7,14 +7,26 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.collect.Lists; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import akka.actor.ActorSelection; +import akka.dispatch.Futures; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedSet; +import java.util.concurrent.TimeUnit; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.Promise; /** * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target @@ -30,17 +42,28 @@ class TransactionContextWrapper { * The list of transaction operations to execute once the TransactionContext becomes available. */ @GuardedBy("queuedTxOperations") - private final List queuedTxOperations = Lists.newArrayList(); + private final List> queuedTxOperations = new ArrayList<>(); + private final TransactionIdentifier identifier; + private final OperationLimiter limiter; + private final String shardName; /** * The resulting TransactionContext. */ private volatile TransactionContext transactionContext; - - private final TransactionIdentifier identifier; - - TransactionContextWrapper(TransactionIdentifier identifier) { - this.identifier = identifier; + @GuardedBy("queuedTxOperations") + private TransactionContext deferredTransactionContext; + @GuardedBy("queuedTxOperations") + private boolean pendingEnqueue; + + TransactionContextWrapper(final TransactionIdentifier identifier, final ActorUtils actorUtils, + final String shardName) { + this.identifier = requireNonNull(identifier); + this.limiter = new OperationLimiter(identifier, + // 1 extra permit for the ready operation + actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1, + TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis())); + this.shardName = requireNonNull(shardName); } TransactionContext getTransactionContext() { @@ -52,28 +75,76 @@ class TransactionContextWrapper { } /** - * Adds a TransactionOperation to be executed once the TransactionContext becomes available. + * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called + * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the + * context is not available. */ - private void enqueueTransactionOperation(TransactionOperation operation) { - boolean invokeOperation = true; - synchronized(queuedTxOperations) { - if(transactionContext == null) { - LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier()); - - invokeOperation = false; - queuedTxOperations.add(operation); + private void enqueueTransactionOperation(final TransactionOperation operation) { + // We have three things to do here: + // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained + // - acquire a permit for the operation if we still need to enqueue it + // - enqueue the operation + // + // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the + // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further + // complications are: + // - this method may be called from the thread invoking executePriorTransactionOperations() + // - user may be violating API contract of using the transaction from a single thread + + // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have + // the lock, we will assert that we will be enqueing another operation. + final TransactionContext contextOnEntry; + synchronized (queuedTxOperations) { + contextOnEntry = transactionContext; + if (contextOnEntry == null) { + checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", identifier); + pendingEnqueue = true; } } - if(invokeOperation) { - operation.invoke(transactionContext); + // Short-circuit if there is a context + if (contextOnEntry != null) { + operation.invoke(transactionContext, null); + return; + } + + boolean cleanupEnqueue = true; + TransactionContext finishHandoff = null; + try { + // Acquire the permit, + final boolean havePermit = limiter.acquire(); + if (!havePermit) { + LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier, + shardName); + } + + // Ready to enqueue, take the lock again and append the operation + synchronized (queuedTxOperations) { + LOG.debug("Tx {} Queuing TransactionOperation", identifier); + queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit)); + pendingEnqueue = false; + cleanupEnqueue = false; + finishHandoff = deferredTransactionContext; + deferredTransactionContext = null; + } + } finally { + if (cleanupEnqueue) { + synchronized (queuedTxOperations) { + pendingEnqueue = false; + finishHandoff = deferredTransactionContext; + deferredTransactionContext = null; + } + } + if (finishHandoff != null) { + executePriorTransactionOperations(finishHandoff); + } } } void maybeExecuteTransactionOperation(final TransactionOperation op) { - - if (transactionContext != null) { - op.invoke(transactionContext); + final TransactionContext localContext = transactionContext; + if (localContext != null) { + op.invoke(localContext, null); } else { // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future // callback to be executed after the Tx is created. @@ -81,8 +152,8 @@ class TransactionContextWrapper { } } - void executePriorTransactionOperations(TransactionContext localTransactionContext) { - while(true) { + void executePriorTransactionOperations(final TransactionContext localTransactionContext) { + while (true) { // Access to queuedTxOperations and transactionContext must be protected and atomic // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing // issues and ensure no TransactionOperation is missed and that they are processed @@ -92,25 +163,59 @@ class TransactionContextWrapper { // in case a TransactionOperation results in another transaction operation being // queued (eg a put operation from a client read Future callback that is notified // synchronously). - Collection operationsBatch = null; - synchronized(queuedTxOperations) { - if(queuedTxOperations.isEmpty()) { - // We're done invoking the TransactionOperations so we can now publish the - // TransactionContext. - transactionContext = localTransactionContext; - break; + final Collection> operationsBatch; + synchronized (queuedTxOperations) { + if (queuedTxOperations.isEmpty()) { + if (!pendingEnqueue) { + // We're done invoking the TransactionOperations so we can now publish the TransactionContext. + localTransactionContext.operationHandOffComplete(); + + // This is null-to-non-null transition after which we are releasing the lock and not doing + // any further processing. + transactionContext = localTransactionContext; + } else { + deferredTransactionContext = localTransactionContext; + } + return; } operationsBatch = new ArrayList<>(queuedTxOperations); queuedTxOperations.clear(); } - // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. - // A slight down-side is that we need to re-acquire the lock below but this should - // be negligible. - for(TransactionOperation oper: operationsBatch) { - oper.invoke(localTransactionContext); + // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is + // that we need to re-acquire the lock below but this should be negligible. + for (Entry oper : operationsBatch) { + final Boolean permit = oper.getValue(); + if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) { + // If the context is not using limiting we need to release operations as we are queueing them, so + // user threads are not charged for them. + limiter.release(); + } + oper.getKey().invoke(localTransactionContext, permit); } } } + + Future readyTransaction(Optional> participatingShardNames) { + // avoid the creation of a promise and a TransactionOperation + final TransactionContext localContext = transactionContext; + if (localContext != null) { + return localContext.readyTransaction(null, participatingShardNames); + } + + final Promise promise = Futures.promise(); + enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) { + promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames)); + } + }); + + return promise.future(); + } + + OperationLimiter getLimiter() { + return limiter; + } }