Elide front-end 3PC for single-shard Tx
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 71799c92d40a9bca75304a196f1c500895a04b09..f12fdd99eab792080069f38bebe1a721a71f1b32 100644 (file)
@@ -354,7 +354,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     @Override
-    public AbstractThreePhaseCommitCohort ready() {
+    public AbstractThreePhaseCommitCohort<?> ready() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Read-only transactions cannot be readied");
 
@@ -371,10 +371,55 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         throttleOperation(txFutureCallbackMap.size());
 
+        final boolean isSingleShard = txFutureCallbackMap.size() == 1;
+        return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
+        TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
+
+        LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
+                txFutureCallback.getShardName(), transactionChainId);
+
+        final OperationCallback.Reference operationCallbackRef =
+                new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
+        final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+        final Future future;
+        if (transactionContext != null) {
+            // avoid the creation of a promise and a TransactionOperation
+            future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
+        } else {
+            final Promise promise = akka.dispatch.Futures.promise();
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
+                }
+            });
+            future = promise.future();
+        }
+
+        return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
+    }
+
+    private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
+            OperationCallback.Reference operationCallbackRef) {
+        if(transactionContext.supportsDirectCommit()) {
+            TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
+            operationCallbackRef.set(rateLimitingCallback);
+            rateLimitingCallback.run();
+            return transactionContext.directCommit();
+        } else {
+            return transactionContext.readyTransaction();
+        }
+    }
+
+    private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
-            LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
+            LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
                         txFutureCallback.getShardName(), transactionChainId);
 
             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
@@ -396,8 +441,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             cohortFutures.add(future);
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
-            getIdentifier().toString());
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
     }
 
     @Override