X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextWrapper.java;h=d11ee3e0bd318d6f6cf9476f5694e2626c4b4ba1;hp=26d7ff8b02bf7beef1e8637fd513af26318777b4;hb=57775303636cc55b83430eb2eef3fa589129e8b6;hpb=c7e1ddeaf842ebb696c8dd38c0ca14c925ee31a1 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index 26d7ff8b02..d11ee3e0bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -7,14 +7,22 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.collect.Lists; +import akka.actor.ActorSelection; +import akka.dispatch.Futures; +import com.google.common.base.Preconditions; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.Promise; /** * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target @@ -30,17 +38,28 @@ class TransactionContextWrapper { * The list of transaction operations to execute once the TransactionContext becomes available. */ @GuardedBy("queuedTxOperations") - private final List queuedTxOperations = Lists.newArrayList(); + private final List> queuedTxOperations = new ArrayList<>(); + private final TransactionIdentifier identifier; + private final OperationLimiter limiter; + private final String shardName; /** * The resulting TransactionContext. */ private volatile TransactionContext transactionContext; - - private final TransactionIdentifier identifier; - - TransactionContextWrapper(final TransactionIdentifier identifier) { - this.identifier = identifier; + @GuardedBy("queuedTxOperations") + private TransactionContext deferredTransactionContext; + @GuardedBy("queuedTxOperations") + private boolean pendingEnqueue; + + TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext, + final String shardName) { + this.identifier = Preconditions.checkNotNull(identifier); + this.limiter = new OperationLimiter(identifier, + // 1 extra permit for the ready operation + actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, + TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis())); + this.shardName = Preconditions.checkNotNull(shardName); } TransactionContext getTransactionContext() { @@ -52,30 +71,77 @@ class TransactionContextWrapper { } /** - * Adds a TransactionOperation to be executed once the TransactionContext becomes available. + * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called + * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the + * context is not available. */ private void enqueueTransactionOperation(final TransactionOperation operation) { - final boolean invokeOperation; + // We have three things to do here: + // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained + // - acquire a permit for the operation if we still need to enqueue it + // - enqueue the operation + // + // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the + // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further + // complications are: + // - this method may be called from the thread invoking executePriorTransactionOperations() + // - user may be violating API contract of using the transaction from a single thread + + // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have + // the lock, we will assert that we will be enqueing another operation. + final TransactionContext contextOnEntry; synchronized (queuedTxOperations) { - if (transactionContext == null) { - LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier()); - - queuedTxOperations.add(operation); - invokeOperation = false; - } else { - invokeOperation = true; + contextOnEntry = transactionContext; + if (contextOnEntry == null) { + Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", + identifier); + pendingEnqueue = true; } } - if (invokeOperation) { - operation.invoke(transactionContext); + // Short-circuit if there is a context + if (contextOnEntry != null) { + operation.invoke(transactionContext, null); + return; + } + + boolean cleanupEnqueue = true; + TransactionContext finishHandoff = null; + try { + // Acquire the permit, + final boolean havePermit = limiter.acquire(); + if (!havePermit) { + LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier, + shardName); + } + + // Ready to enqueue, take the lock again and append the operation + synchronized (queuedTxOperations) { + LOG.debug("Tx {} Queuing TransactionOperation", identifier); + queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit)); + pendingEnqueue = false; + cleanupEnqueue = false; + finishHandoff = deferredTransactionContext; + deferredTransactionContext = null; + } + } finally { + if (cleanupEnqueue) { + synchronized (queuedTxOperations) { + pendingEnqueue = false; + finishHandoff = deferredTransactionContext; + deferredTransactionContext = null; + } + } + if (finishHandoff != null) { + executePriorTransactionOperations(finishHandoff); + } } } void maybeExecuteTransactionOperation(final TransactionOperation op) { - - if (transactionContext != null) { - op.invoke(transactionContext); + final TransactionContext localContext = transactionContext; + if (localContext != null) { + op.invoke(localContext, null); } else { // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future // callback to be executed after the Tx is created. @@ -84,7 +150,7 @@ class TransactionContextWrapper { } void executePriorTransactionOperations(final TransactionContext localTransactionContext) { - while(true) { + while (true) { // Access to queuedTxOperations and transactionContext must be protected and atomic // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing // issues and ensure no TransactionOperation is missed and that they are processed @@ -94,25 +160,59 @@ class TransactionContextWrapper { // in case a TransactionOperation results in another transaction operation being // queued (eg a put operation from a client read Future callback that is notified // synchronously). - Collection operationsBatch = null; - synchronized(queuedTxOperations) { - if(queuedTxOperations.isEmpty()) { - // We're done invoking the TransactionOperations so we can now publish the - // TransactionContext. - transactionContext = localTransactionContext; - break; + final Collection> operationsBatch; + synchronized (queuedTxOperations) { + if (queuedTxOperations.isEmpty()) { + if (!pendingEnqueue) { + // We're done invoking the TransactionOperations so we can now publish the TransactionContext. + localTransactionContext.operationHandOffComplete(); + + // This is null-to-non-null transition after which we are releasing the lock and not doing + // any further processing. + transactionContext = localTransactionContext; + } else { + deferredTransactionContext = localTransactionContext; + } + return; } operationsBatch = new ArrayList<>(queuedTxOperations); queuedTxOperations.clear(); } - // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. - // A slight down-side is that we need to re-acquire the lock below but this should - // be negligible. - for(TransactionOperation oper: operationsBatch) { - oper.invoke(localTransactionContext); + // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is + // that we need to re-acquire the lock below but this should be negligible. + for (Entry oper : operationsBatch) { + final Boolean permit = oper.getValue(); + if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) { + // If the context is not using limiting we need to release operations as we are queueing them, so + // user threads are not charged for them. + limiter.release(); + } + oper.getKey().invoke(localTransactionContext, permit); } } } + + Future readyTransaction() { + // avoid the creation of a promise and a TransactionOperation + final TransactionContext localContext = transactionContext; + if (localContext != null) { + return localContext.readyTransaction(null); + } + + final Promise promise = Futures.promise(); + enqueueTransactionOperation(new TransactionOperation() { + @Override + public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) { + promise.completeWith(newTransactionContext.readyTransaction(havePermit)); + } + }); + + return promise.future(); + } + + OperationLimiter getLimiter() { + return limiter; + } }