BUG 3019 : Fix Operation throttling for modification batching scenarios
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 82258b46a44b237748342b9d43e8373cb04c0dde..f7cb27b07f2c139c857e2c9308372678e7c29afd 100644 (file)
@@ -24,8 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -57,12 +55,11 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private final AbstractTransactionContextFactory<?> txContextFactory;
     private final TransactionType type;
     private TransactionState state = TransactionState.OPEN;
-    private volatile OperationCompleter operationCompleter;
-    private volatile Semaphore operationLimiter;
 
     @VisibleForTesting
     public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
-        super(txContextFactory.nextIdentifier(), false);
+        super(txContextFactory.nextIdentifier(), txContextFactory.getActorContext().getDatastoreContext()
+                .isTransactionDebugContextEnabled());
         this.txContextFactory = txContextFactory;
         this.type = Preconditions.checkNotNull(type);
 
@@ -75,8 +72,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} exists {}", getIdentifier(), path);
 
-        throttleOperation();
-
         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@@ -98,8 +93,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         if (YangInstanceIdentifier.EMPTY.equals(path)) {
             return readAllData();
         } else {
-            throttleOperation();
-
             return singleShardRead(shardNameFromIdentifier(path), path);
         }
     }
@@ -149,8 +142,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} delete {}", getIdentifier(), path);
 
-        throttleOperation();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -166,8 +157,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} merge {}", getIdentifier(), path);
 
-        throttleOperation();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -183,8 +172,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} write {}", getIdentifier(), path);
 
-        throttleOperation();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -244,7 +231,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         final AbstractThreePhaseCommitCohort<?> ret;
         switch (txContextAdapters.size()) {
         case 0:
-            TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(txContextFactory.getActorContext());
             ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
             break;
         case 1:
@@ -256,12 +242,13 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         }
 
         txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
-        return ret;
+
+        final Throwable debugContext = getDebugContext();
+        return debugContext == null ? ret : new DebugThreePhaseCommitCohort(getIdentifier(), ret, debugContext);
     }
 
     private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
             final TransactionContextWrapper contextAdapter) {
-        throttleOperation();
 
         LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
 
@@ -304,30 +291,11 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
             final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
 
-        throttleOperation();
         final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
         for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
             LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
 
-            TransactionContextWrapper contextAdapter = e.getValue();
-            final TransactionContext transactionContext = contextAdapter.getTransactionContext();
-            Future<ActorSelection> future;
-            if (transactionContext != null) {
-                // avoid the creation of a promise and a TransactionOperation
-                future = transactionContext.readyTransaction();
-            } else {
-                final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
-                contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        promise.completeWith(transactionContext.readyTransaction());
-                    }
-                });
-
-                future = promise.future();
-            }
-
-            cohortFutures.add(future);
+            cohortFutures.add(e.getValue().readyTransaction());
         }
 
         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
@@ -363,44 +331,4 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     ActorContext getActorContext() {
         return txContextFactory.getActorContext();
     }
-
-    OperationCompleter getCompleter() {
-        OperationCompleter ret = operationCompleter;
-        if (ret == null) {
-            final Semaphore s = getLimiter();
-            ret = new OperationCompleter(s);
-            operationCompleter = ret;
-        }
-
-        return ret;
-    }
-
-    Semaphore getLimiter() {
-        Semaphore ret = operationLimiter;
-        if (ret == null) {
-            // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
-            ret = new Semaphore(getActorContext().getTransactionOutstandingOperationLimit());
-            operationLimiter = ret;
-        }
-        return ret;
-    }
-
-    void throttleOperation() {
-        throttleOperation(1);
-    }
-
-    private void throttleOperation(int acquirePermits) {
-        try {
-            if (!getLimiter().tryAcquire(acquirePermits,
-                getActorContext().getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
-                LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
-            }
-        } catch (InterruptedException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier(), e);
-            } else {
-                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
-            }
-        }
-    }
 }