Merge "Return local shard's DataTree from ActorContext#findPrimaryShardAsync"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index 71799c92d40a9bca75304a196f1c500895a04b09..e397ab501c064adf98c5ee6c2f6708f05eb6f2fe 100644 (file)
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
@@ -159,7 +160,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return false;
     }
 
-    private boolean isRootPath(YangInstanceIdentifier path){
+    private static boolean isRootPath(YangInstanceIdentifier path) {
         return !path.getPathArguments().iterator().hasNext();
     }
 
@@ -354,7 +355,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     @Override
-    public AbstractThreePhaseCommitCohort ready() {
+    public AbstractThreePhaseCommitCohort<?> ready() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Read-only transactions cannot be readied");
 
@@ -371,10 +372,55 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         throttleOperation(txFutureCallbackMap.size());
 
+        final boolean isSingleShard = txFutureCallbackMap.size() == 1;
+        return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
+        TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
+
+        LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
+                txFutureCallback.getShardName(), transactionChainId);
+
+        final OperationCallback.Reference operationCallbackRef =
+                new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
+        final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+        final Future future;
+        if (transactionContext != null) {
+            // avoid the creation of a promise and a TransactionOperation
+            future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
+        } else {
+            final Promise promise = akka.dispatch.Futures.promise();
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
+                }
+            });
+            future = promise.future();
+        }
+
+        return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
+    }
+
+    private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
+            OperationCallback.Reference operationCallbackRef) {
+        if(transactionContext.supportsDirectCommit()) {
+            TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
+            operationCallbackRef.set(rateLimitingCallback);
+            rateLimitingCallback.run();
+            return transactionContext.directCommit();
+        } else {
+            return transactionContext.readyTransaction();
+        }
+    }
+
+    private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
-            LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
+            LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
                         txFutureCallback.getShardName(), transactionChainId);
 
             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
@@ -396,8 +442,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             cohortFutures.add(future);
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
-            getIdentifier().toString());
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
     }
 
     @Override
@@ -433,7 +478,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
-    protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+    protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
         return actorContext.findPrimaryShardAsync(shardName);
     }
 
@@ -453,20 +498,20 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
         if(txFutureCallback == null) {
-            Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
+            Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
 
             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
 
             txFutureCallback = newTxFutureCallback;
             txFutureCallbackMap.put(shardName, txFutureCallback);
 
-            findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
+            findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
                 @Override
-                public void onComplete(Throwable failure, ActorSelection primaryShard) {
+                public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
                     if(failure != null) {
                         newTxFutureCallback.createTransactionContext(failure, null);
                     } else {
-                        newTxFutureCallback.setPrimaryShard(primaryShard);
+                        newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
                     }
                 }
             }, actorContext.getClientDispatcher());