Convert CDS implementation to use msdal APIs
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
index e8dab2c17ebb12e54241543f7ace660e2940009f..a126ce95971bae232c2da0b1f9fb9aa3c550cfe6 100644 (file)
@@ -16,7 +16,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.GuardedBy;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,8 +38,8 @@ class TransactionContextWrapper {
      */
     @GuardedBy("queuedTxOperations")
     private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
-
     private final TransactionIdentifier identifier;
+    private final String shardName;
 
     /**
      * The resulting TransactionContext.
@@ -48,11 +48,14 @@ class TransactionContextWrapper {
 
     private final OperationLimiter limiter;
 
-    TransactionContextWrapper(TransactionIdentifier identifier, final ActorContext actorContext) {
+    TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
+            final String shardName) {
         this.identifier = Preconditions.checkNotNull(identifier);
         this.limiter = new OperationLimiter(identifier,
-                actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation
+                // 1 extra permit for the ready operation
+                actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
                 TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
+        this.shardName = Preconditions.checkNotNull(shardName);
     }
 
     TransactionContext getTransactionContext() {
@@ -70,7 +73,7 @@ class TransactionContextWrapper {
         final boolean invokeOperation;
         synchronized (queuedTxOperations) {
             if (transactionContext == null) {
-                LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
+                LOG.debug("Tx {} Queuing TransactionOperation", identifier);
 
                 queuedTxOperations.add(operation);
                 invokeOperation = false;
@@ -82,7 +85,10 @@ class TransactionContextWrapper {
         if (invokeOperation) {
             operation.invoke(transactionContext);
         } else {
-            limiter.acquire();
+            if (!limiter.acquire()) {
+                LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
+                    shardName);
+            }
         }
     }
 
@@ -98,7 +104,7 @@ class TransactionContextWrapper {
     }
 
     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
-        while(true) {
+        while (true) {
             // Access to queuedTxOperations and transactionContext must be protected and atomic
             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
             // issues and ensure no TransactionOperation is missed and that they are processed
@@ -108,13 +114,13 @@ class TransactionContextWrapper {
             // in case a TransactionOperation results in another transaction operation being
             // queued (eg a put operation from a client read Future callback that is notified
             // synchronously).
-            Collection<TransactionOperation> operationsBatch = null;
+            final Collection<TransactionOperation> operationsBatch;
             synchronized (queuedTxOperations) {
                 if (queuedTxOperations.isEmpty()) {
                     // We're done invoking the TransactionOperations so we can now publish the
                     // TransactionContext.
                     localTransactionContext.operationHandOffComplete();
-                    if(!localTransactionContext.usesOperationLimiting()){
+                    if (!localTransactionContext.usesOperationLimiting()) {
                         limiter.releaseAll();
                     }
                     transactionContext = localTransactionContext;
@@ -143,8 +149,8 @@ class TransactionContextWrapper {
         final Promise<ActorSelection> promise = Futures.promise();
         enqueueTransactionOperation(new TransactionOperation() {
             @Override
-            public void invoke(TransactionContext transactionContext) {
-                promise.completeWith(transactionContext.readyTransaction());
+            public void invoke(TransactionContext newTransactionContext) {
+                promise.completeWith(newTransactionContext.readyTransaction());
             }
         });