BUG 3019 : Fix Operation throttling for modification batching scenarios
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 5aafcfc88f2eb72ccdef204d938731e8e08b5f63..f7cb27b07f2c139c857e2c9308372678e7c29afd 100644 (file)
@@ -53,7 +53,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
     private final AbstractTransactionContextFactory<?> txContextFactory;
-    private final OperationLimiter limiter;
     private final TransactionType type;
     private TransactionState state = TransactionState.OPEN;
 
@@ -64,11 +63,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         this.txContextFactory = txContextFactory;
         this.type = Preconditions.checkNotNull(type);
 
-        // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
-        this.limiter = new OperationLimiter(getIdentifier(),
-            getActorContext().getTransactionOutstandingOperationLimit(),
-            getActorContext().getDatastoreContext().getOperationTimeoutInSeconds());
-
         LOG.debug("New {} Tx - {}", type, getIdentifier());
     }
 
@@ -78,8 +72,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} exists {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
@@ -101,8 +93,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         if (YangInstanceIdentifier.EMPTY.equals(path)) {
             return readAllData();
         } else {
-            limiter.acquire();
-
             return singleShardRead(shardNameFromIdentifier(path), path);
         }
     }
@@ -152,8 +142,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} delete {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -169,8 +157,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} merge {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -186,8 +172,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} write {}", getIdentifier(), path);
 
-        limiter.acquire();
-
         TransactionContextWrapper contextAdapter = getContextAdapter(path);
         contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
@@ -265,7 +249,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
             final TransactionContextWrapper contextAdapter) {
-        limiter.acquire();
 
         LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
 
@@ -308,30 +291,11 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
             final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
 
-        limiter.acquire();
         final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
         for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
             LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
 
-            TransactionContextWrapper contextAdapter = e.getValue();
-            final TransactionContext transactionContext = contextAdapter.getTransactionContext();
-            Future<ActorSelection> future;
-            if (transactionContext != null) {
-                // avoid the creation of a promise and a TransactionOperation
-                future = transactionContext.readyTransaction();
-            } else {
-                final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
-                contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
-                    @Override
-                    public void invoke(TransactionContext transactionContext) {
-                        promise.completeWith(transactionContext.readyTransaction());
-                    }
-                });
-
-                future = promise.future();
-            }
-
-            cohortFutures.add(future);
+            cohortFutures.add(e.getValue().readyTransaction());
         }
 
         return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
@@ -367,8 +331,4 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     ActorContext getActorContext() {
         return txContextFactory.getActorContext();
     }
-
-    OperationLimiter getLimiter() {
-        return limiter;
-    }
 }