X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=cf261cbd2af103b70f3dfce9885903a88ee293e6;hp=87959efe8ae2def5684e253f2e0840c7177db838;hb=107324809285bfbb9890cba38ffa18390f8de4bd;hpb=915a86bcff78e373ae9487e19f5e24828ccc1e9b diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 87959efe8a..cf261cbd2a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorSelection; -import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; @@ -21,22 +19,17 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.Future; -import scala.concurrent.Promise; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain { - private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class); - private interface State { boolean isReady(); - List> getPreviousReadyFutures(); + List> getPreviousReadyFutures(); } private static class Allocated implements State { @@ -52,14 +45,14 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { } @Override - public List> getPreviousReadyFutures() { + public List> getPreviousReadyFutures() { return transaction.getReadyFutures(); } } private static abstract class AbstractDefaultState implements State { @Override - public List> getPreviousReadyFutures() { + public List> getPreviousReadyFutures() { return Collections.emptyList(); } } @@ -104,11 +97,13 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { + actorContext.acquireTxCreationPermit(); return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { + actorContext.acquireTxCreationPermit(); return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY); } @@ -117,7 +112,7 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { currentState = CLOSED_STATE; // Send a close transaction chain request to each and every shard - actorContext.broadcast(new CloseTransactionChain(transactionChainId)); + actorContext.broadcast(new CloseTransactionChain(transactionChainId).toSerializable()); } private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) { @@ -137,81 +132,4 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { private void checkReadyState(State state) { Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet"); } - - private static class ChainedTransactionProxy extends TransactionProxy { - - /** - * Stores the ready Futures from the previous Tx in the chain. - */ - private final List> previousReadyFutures; - - /** - * Stores the ready Futures from this transaction when it is readied. - */ - private volatile List> readyFutures; - - private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, - String transactionChainId, List> previousReadyFutures) { - super(actorContext, transactionType, transactionChainId); - this.previousReadyFutures = previousReadyFutures; - } - - List> getReadyFutures() { - return readyFutures; - } - - boolean isReady() { - return readyFutures != null; - } - - @Override - protected void onTransactionReady(List> readyFutures) { - LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), - readyFutures.size(), getTransactionChainId()); - this.readyFutures = readyFutures; - } - - /** - * This method is overridden to ensure the previous Tx's ready operations complete - * before we create the next shard Tx in the chain to avoid creation failures if the - * previous Tx's ready operations haven't completed yet. - */ - @Override - protected Future sendCreateTransaction(final ActorSelection shard, - final Object serializedCreateMessage) { - - // Check if there are any previous ready Futures, otherwise let the super class handle it. - if(previousReadyFutures.isEmpty()) { - return super.sendCreateTransaction(shard, serializedCreateMessage); - } - - // Combine the ready Futures into 1. - Future> combinedFutures = akka.dispatch.Futures.sequence( - previousReadyFutures, getActorContext().getActorSystem().dispatcher()); - - // Add a callback for completion of the combined Futures. - final Promise createTxPromise = akka.dispatch.Futures.promise(); - OnComplete> onComplete = new OnComplete>() { - @Override - public void onComplete(Throwable failure, Iterable notUsed) { - if(failure != null) { - // A Ready Future failed so fail the returned Promise. - createTxPromise.failure(failure); - } else { - LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}", - getIdentifier(), getTransactionChainId()); - - // Send the CreateTx message and use the resulting Future to complete the - // returned Promise. - createTxPromise.completeWith(getActorContext().executeOperationAsync(shard, - serializedCreateMessage)); - } - } - }; - - combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher()); - - return createTxPromise.future(); - } - } }