Modify ChainedTransactionProxy to override sending of FindPrimaryShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionChainProxy.java
index 87959efe8ae2def5684e253f2e0840c7177db838..58ac1d8b8265bc50fb7d38dea1dd9c1b916211fc 100644 (file)
@@ -104,11 +104,13 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        actorContext.acquireTxCreationPermit();
         return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        actorContext.acquireTxCreationPermit();
         return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
@@ -173,45 +175,47 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
 
         /**
          * This method is overridden to ensure the previous Tx's ready operations complete
-         * before we create the next shard Tx in the chain to avoid creation failures if the
+         * before we initiate the next Tx in the chain to avoid creation failures if the
          * previous Tx's ready operations haven't completed yet.
          */
         @Override
-        protected Future<Object> sendCreateTransaction(final ActorSelection shard,
-                final Object serializedCreateMessage) {
-
+        protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
             // Check if there are any previous ready Futures, otherwise let the super class handle it.
             if(previousReadyFutures.isEmpty()) {
-                return super.sendCreateTransaction(shard, serializedCreateMessage);
+                return super.sendFindPrimaryShardAsync(shardName);
+            }
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
+                        previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
             }
 
             // Combine the ready Futures into 1.
             Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
-                    previousReadyFutures, getActorContext().getActorSystem().dispatcher());
+                    previousReadyFutures, getActorContext().getClientDispatcher());
 
             // Add a callback for completion of the combined Futures.
-            final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
+            final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
             OnComplete<Iterable<ActorSelection>> onComplete = new OnComplete<Iterable<ActorSelection>>() {
                 @Override
                 public void onComplete(Throwable failure, Iterable<ActorSelection> notUsed) {
                     if(failure != null) {
                         // A Ready Future failed so fail the returned Promise.
-                        createTxPromise.failure(failure);
+                        returnPromise.failure(failure);
                     } else {
-                        LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}",
+                        LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
                                 getIdentifier(), getTransactionChainId());
 
-                        // Send the CreateTx message and use the resulting Future to complete the
+                        // Send the FindPrimaryShard message and use the resulting Future to complete the
                         // returned Promise.
-                        createTxPromise.completeWith(getActorContext().executeOperationAsync(shard,
-                                serializedCreateMessage));
+                        returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
                     }
                 }
             };
 
-            combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher());
+            combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
 
-            return createTxPromise.future();
+            return returnPromise.future();
         }
     }
 }