X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextWrapper.java;h=2facfbd1a20a29a7d65719c4fba31ee0c4875977;hb=2a6aa1775604906755883f810ee9ea6d5f286135;hp=0e1260962d37d807f53233729f3f999e996faed9;hpb=123afd8b015173c459f4937c84eb2e91286f65a8;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index 0e1260962d..2facfbd1a2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -7,18 +7,22 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorSelection; import akka.dispatch.Futures; -import com.google.common.base.Preconditions; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map.Entry; +import java.util.Optional; +import java.util.SortedSet; import java.util.concurrent.TimeUnit; -import javax.annotation.concurrent.GuardedBy; +import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -52,14 +56,14 @@ class TransactionContextWrapper { @GuardedBy("queuedTxOperations") private boolean pendingEnqueue; - TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext, + TransactionContextWrapper(final TransactionIdentifier identifier, final ActorUtils actorUtils, final String shardName) { - this.identifier = Preconditions.checkNotNull(identifier); + this.identifier = requireNonNull(identifier); this.limiter = new OperationLimiter(identifier, // 1 extra permit for the ready operation - actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, - TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis())); - this.shardName = Preconditions.checkNotNull(shardName); + actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1, + TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis())); + this.shardName = requireNonNull(shardName); } TransactionContext getTransactionContext() { @@ -93,8 +97,7 @@ class TransactionContextWrapper { synchronized (queuedTxOperations) { contextOnEntry = transactionContext; if (contextOnEntry == null) { - Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", - identifier); + checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", identifier); pendingEnqueue = true; } } @@ -166,9 +169,6 @@ class TransactionContextWrapper { if (!pendingEnqueue) { // We're done invoking the TransactionOperations so we can now publish the TransactionContext. localTransactionContext.operationHandOffComplete(); - if (!localTransactionContext.usesOperationLimiting()) { - limiter.releaseAll(); - } // This is null-to-non-null transition after which we are releasing the lock and not doing // any further processing. @@ -183,27 +183,32 @@ class TransactionContextWrapper { queuedTxOperations.clear(); } - // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. - // A slight down-side is that we need to re-acquire the lock below but this should - // be negligible. + // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is + // that we need to re-acquire the lock below but this should be negligible. for (Entry oper : operationsBatch) { - oper.getKey().invoke(localTransactionContext, oper.getValue()); + final Boolean permit = oper.getValue(); + if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) { + // If the context is not using limiting we need to release operations as we are queueing them, so + // user threads are not charged for them. + limiter.release(); + } + oper.getKey().invoke(localTransactionContext, permit); } } } - Future readyTransaction() { + Future readyTransaction(Optional> participatingShardNames) { // avoid the creation of a promise and a TransactionOperation final TransactionContext localContext = transactionContext; if (localContext != null) { - return localContext.readyTransaction(null); + return localContext.readyTransaction(null, participatingShardNames); } final Promise promise = Futures.promise(); enqueueTransactionOperation(new TransactionOperation() { @Override public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) { - promise.completeWith(newTransactionContext.readyTransaction(havePermit)); + promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames)); } });