X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=58ac1d8b8265bc50fb7d38dea1dd9c1b916211fc;hb=f97618f25dfc073d1de5d883f1794eefdb3e5c16;hp=87959efe8ae2def5684e253f2e0840c7177db838;hpb=0dff5b2c252d8ae9f7f68ff87f14b246122ef81e;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 87959efe8a..58ac1d8b82 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -104,11 +104,13 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { + actorContext.acquireTxCreationPermit(); return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { + actorContext.acquireTxCreationPermit(); return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY); } @@ -173,45 +175,47 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { /** * This method is overridden to ensure the previous Tx's ready operations complete - * before we create the next shard Tx in the chain to avoid creation failures if the + * before we initiate the next Tx in the chain to avoid creation failures if the * previous Tx's ready operations haven't completed yet. */ @Override - protected Future sendCreateTransaction(final ActorSelection shard, - final Object serializedCreateMessage) { - + protected Future sendFindPrimaryShardAsync(final String shardName) { // Check if there are any previous ready Futures, otherwise let the super class handle it. if(previousReadyFutures.isEmpty()) { - return super.sendCreateTransaction(shard, serializedCreateMessage); + return super.sendFindPrimaryShardAsync(shardName); + } + + if(LOG.isDebugEnabled()) { + LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}", + previousReadyFutures.size(), getIdentifier(), getTransactionChainId()); } // Combine the ready Futures into 1. Future> combinedFutures = akka.dispatch.Futures.sequence( - previousReadyFutures, getActorContext().getActorSystem().dispatcher()); + previousReadyFutures, getActorContext().getClientDispatcher()); // Add a callback for completion of the combined Futures. - final Promise createTxPromise = akka.dispatch.Futures.promise(); + final Promise returnPromise = akka.dispatch.Futures.promise(); OnComplete> onComplete = new OnComplete>() { @Override public void onComplete(Throwable failure, Iterable notUsed) { if(failure != null) { // A Ready Future failed so fail the returned Promise. - createTxPromise.failure(failure); + returnPromise.failure(failure); } else { - LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}", + LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}", getIdentifier(), getTransactionChainId()); - // Send the CreateTx message and use the resulting Future to complete the + // Send the FindPrimaryShard message and use the resulting Future to complete the // returned Promise. - createTxPromise.completeWith(getActorContext().executeOperationAsync(shard, - serializedCreateMessage)); + returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName)); } } }; - combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher()); + combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher()); - return createTxPromise.future(); + return returnPromise.future(); } } }