CDS: do not propagate SchemaContext
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index a0987cd5d63943979985527794dbe8471c0afe78..c9f1f752068506becc15163a528dbf501905f064 100644 (file)
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
@@ -40,6 +41,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -158,7 +160,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return false;
     }
 
-    private boolean isRootPath(YangInstanceIdentifier path){
+    private static boolean isRootPath(YangInstanceIdentifier path) {
         return !path.getPathArguments().iterator().hasNext();
     }
 
@@ -219,7 +221,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
                 try {
                     proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
                             future.get(), actorContext.getSchemaContext()));
-                } catch (InterruptedException | ExecutionException e) {
+                } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
                     proxyFuture.setException(e);
                 }
             }
@@ -353,7 +355,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     @Override
-    public AbstractThreePhaseCommitCohort ready() {
+    public AbstractThreePhaseCommitCohort<?> ready() {
         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
                 "Read-only transactions cannot be readied");
 
@@ -370,10 +372,55 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         throttleOperation(txFutureCallbackMap.size());
 
+        final boolean isSingleShard = txFutureCallbackMap.size() == 1;
+        return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
+        TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
+
+        LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
+                txFutureCallback.getShardName(), transactionChainId);
+
+        final OperationCallback.Reference operationCallbackRef =
+                new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
+        final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+        final Future future;
+        if (transactionContext != null) {
+            // avoid the creation of a promise and a TransactionOperation
+            future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
+        } else {
+            final Promise promise = akka.dispatch.Futures.promise();
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
+                }
+            });
+            future = promise.future();
+        }
+
+        return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
+    }
+
+    private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
+            OperationCallback.Reference operationCallbackRef) {
+        if(transactionContext.supportsDirectCommit()) {
+            TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
+            operationCallbackRef.set(rateLimitingCallback);
+            rateLimitingCallback.run();
+            return transactionContext.directCommit();
+        } else {
+            return transactionContext.readyTransaction();
+        }
+    }
+
+    private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
 
-            LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
+            LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
                         txFutureCallback.getShardName(), transactionChainId);
 
             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
@@ -395,8 +442,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
             cohortFutures.add(future);
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
-            getIdentifier().toString());
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
     }
 
     @Override
@@ -432,7 +478,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
-    protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+    protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
         return actorContext.findPrimaryShardAsync(shardName);
     }
 
@@ -452,20 +498,20 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
         if(txFutureCallback == null) {
-            Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
+            Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
 
             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
 
             txFutureCallback = newTxFutureCallback;
             txFutureCallbackMap.put(shardName, txFutureCallback);
 
-            findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
+            findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
                 @Override
-                public void onComplete(Throwable failure, ActorSelection primaryShard) {
+                public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
                     if(failure != null) {
                         newTxFutureCallback.createTransactionContext(failure, null);
                     } else {
-                        newTxFutureCallback.setPrimaryShard(primaryShard);
+                        newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
                     }
                 }
             }, actorContext.getClientDispatcher());
@@ -512,11 +558,11 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
             return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
-                    transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+                    transactionChainId, actorContext, isTxActorLocal, remoteTransactionVersion,
                     operationCompleter);
         } else {
             return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
-                    actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+                    actorContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
         }
     }
 }