X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionContextWrapper.java;h=a126ce95971bae232c2da0b1f9fb9aa3c550cfe6;hb=f41c5e6e6f6e10b36b1e4b1992877e38e718c8fb;hp=e8dab2c17ebb12e54241543f7ace660e2940009f;hpb=5105751d47439e5d71d3a3b8035e4afd262c1890;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java index e8dab2c17e..a126ce9597 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java @@ -16,7 +16,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +38,8 @@ class TransactionContextWrapper { */ @GuardedBy("queuedTxOperations") private final List queuedTxOperations = Lists.newArrayList(); - private final TransactionIdentifier identifier; + private final String shardName; /** * The resulting TransactionContext. @@ -48,11 +48,14 @@ class TransactionContextWrapper { private final OperationLimiter limiter; - TransactionContextWrapper(TransactionIdentifier identifier, final ActorContext actorContext) { + TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext, + final String shardName) { this.identifier = Preconditions.checkNotNull(identifier); this.limiter = new OperationLimiter(identifier, - actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation + // 1 extra permit for the ready operation + actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis())); + this.shardName = Preconditions.checkNotNull(shardName); } TransactionContext getTransactionContext() { @@ -70,7 +73,7 @@ class TransactionContextWrapper { final boolean invokeOperation; synchronized (queuedTxOperations) { if (transactionContext == null) { - LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier()); + LOG.debug("Tx {} Queuing TransactionOperation", identifier); queuedTxOperations.add(operation); invokeOperation = false; @@ -82,7 +85,10 @@ class TransactionContextWrapper { if (invokeOperation) { operation.invoke(transactionContext); } else { - limiter.acquire(); + if (!limiter.acquire()) { + LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier, + shardName); + } } } @@ -98,7 +104,7 @@ class TransactionContextWrapper { } void executePriorTransactionOperations(final TransactionContext localTransactionContext) { - while(true) { + while (true) { // Access to queuedTxOperations and transactionContext must be protected and atomic // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing // issues and ensure no TransactionOperation is missed and that they are processed @@ -108,13 +114,13 @@ class TransactionContextWrapper { // in case a TransactionOperation results in another transaction operation being // queued (eg a put operation from a client read Future callback that is notified // synchronously). - Collection operationsBatch = null; + final Collection operationsBatch; synchronized (queuedTxOperations) { if (queuedTxOperations.isEmpty()) { // We're done invoking the TransactionOperations so we can now publish the // TransactionContext. localTransactionContext.operationHandOffComplete(); - if(!localTransactionContext.usesOperationLimiting()){ + if (!localTransactionContext.usesOperationLimiting()) { limiter.releaseAll(); } transactionContext = localTransactionContext; @@ -143,8 +149,8 @@ class TransactionContextWrapper { final Promise promise = Futures.promise(); enqueueTransactionOperation(new TransactionOperation() { @Override - public void invoke(TransactionContext transactionContext) { - promise.completeWith(transactionContext.readyTransaction()); + public void invoke(TransactionContext newTransactionContext) { + promise.completeWith(newTransactionContext.readyTransaction()); } });