Convert OperationCompleter to OperationLimiter
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index e7a00042e4146c5ddc03a013e6eeda3198a38ea3..5aafcfc88f2eb72ccdef204d938731e8e08b5f63 100644 (file)
@@ -24,8 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -55,10 +53,9 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
     private final AbstractTransactionContextFactory<?> txContextFactory;
+    private final OperationLimiter limiter;
     private final TransactionType type;
     private TransactionState state = TransactionState.OPEN;
-    private volatile OperationCompleter operationCompleter;
-    private volatile Semaphore operationLimiter;
 
     @VisibleForTesting
     public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
@@ -67,6 +64,11 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         this.txContextFactory = txContextFactory;
         this.type = Preconditions.checkNotNull(type);
 
+        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+        this.limiter = new OperationLimiter(getIdentifier(),
+            getActorContext().getTransactionOutstandingOperationLimit(),
+            getActorContext().getDatastoreContext().getOperationTimeoutInSeconds());
+
         LOG.debug("New {} Tx - {}", type, getIdentifier());
     }
 
@@ -76,7 +78,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} exists {}", getIdentifier(), path);
 
-        throttleOperation();
+        limiter.acquire();
 
         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
@@ -99,7 +101,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         if (YangInstanceIdentifier.EMPTY.equals(path)) {
             return readAllData();
         } else {
-            throttleOperation();
+            limiter.acquire();
 
             return singleShardRead(shardNameFromIdentifier(path), path);
         }
@@ -150,7 +152,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} delete {}", getIdentifier(), path);
 
-        throttleOperation();
+        limiter.acquire();
 
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@@ -167,7 +169,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} merge {}", getIdentifier(), path);
 
-        throttleOperation();
+        limiter.acquire();
 
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@@ -184,7 +186,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} write {}", getIdentifier(), path);
 
-        throttleOperation();
+        limiter.acquire();
 
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@@ -263,7 +265,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
             final TransactionContextWrapper contextAdapter) {
-        throttleOperation();
+        limiter.acquire();
 
         LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
 
@@ -306,7 +308,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
             final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
 
-        throttleOperation();
+        limiter.acquire();
         final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
         for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
             LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
@@ -366,43 +368,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return txContextFactory.getActorContext();
     }
 
-    OperationCompleter getCompleter() {
-        OperationCompleter ret = operationCompleter;
-        if (ret == null) {
-            final Semaphore s = getLimiter();
-            ret = new OperationCompleter(s);
-            operationCompleter = ret;
-        }
-
-        return ret;
-    }
-
-    Semaphore getLimiter() {
-        Semaphore ret = operationLimiter;
-        if (ret == null) {
-            // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
-            ret = new Semaphore(getActorContext().getTransactionOutstandingOperationLimit());
-            operationLimiter = ret;
-        }
-        return ret;
-    }
-
-    void throttleOperation() {
-        throttleOperation(1);
-    }
-
-    private void throttleOperation(int acquirePermits) {
-        try {
-            if (!getLimiter().tryAcquire(acquirePermits,
-                getActorContext().getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
-                LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
-            }
-        } catch (InterruptedException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier(), e);
-            } else {
-                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
-            }
-        }
+    OperationLimiter getLimiter() {
+        return limiter;
     }
 }