X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=93f9e6b7de1e2085f01d36c74a37b84ff7ecb4d2;hp=b467ee4ddbf56c456c2e9f0f62381eefa173e31d;hb=2ce392e287211179691e9ad9c738a6776effacd8;hpb=def3ed5d503108b48ad3e90e451e20b72b5dbd72 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index b467ee4ddb..93f9e6b7de 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -9,18 +9,17 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.Futures; +import akka.dispatch.OnComplete; +import java.util.AbstractMap.SimpleEntry; +import java.util.List; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import scala.concurrent.Await; import scala.concurrent.Future; - -import java.util.Collections; -import java.util.List; +import scala.concurrent.Promise; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard @@ -28,7 +27,7 @@ import java.util.List; public class TransactionChainProxy implements DOMStoreTransactionChain{ private final ActorContext actorContext; private final String transactionChainId; - private volatile List> cohortFutures = Collections.emptyList(); + private volatile SimpleEntry>> previousTxReadyFutures; public TransactionChainProxy(ActorContext actorContext) { this.actorContext = actorContext; @@ -37,20 +36,17 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{ @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, this); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_WRITE, this); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.WRITE_ONLY, this); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY); } @Override @@ -63,17 +59,62 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{ return transactionChainId; } - public void onTransactionReady(List> cohortFutures){ - this.cohortFutures = cohortFutures; - } + private class ChainedTransactionProxy extends TransactionProxy { + + ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) { + super(actorContext, transactionType, transactionChainId); + } + + @Override + protected void onTransactionReady(List> cohortFutures) { + if(!cohortFutures.isEmpty()) { + previousTxReadyFutures = new SimpleEntry<>(getIdentifier(), cohortFutures); + } else { + previousTxReadyFutures = null; + } + } + + /** + * This method is overridden to ensure the previous Tx's ready operations complete + * before we create the next shard Tx in the chain to avoid creation failures if the + * previous Tx's ready operations haven't completed yet. + */ + @Override + protected Future sendCreateTransaction(final ActorSelection shard, + final Object serializedCreateMessage) { + // Check if there are any previous ready Futures. Also make sure the previous ready + // Futures aren't for this Tx as deadlock would occur if tried to wait on our own + // Futures. This may happen b/c the shard Tx creates are done async so it's possible + // for the client to ready this Tx before we've even attempted to create a shard Tx. + if(previousTxReadyFutures == null || + previousTxReadyFutures.getKey().equals(getIdentifier())) { + return super.sendCreateTransaction(shard, serializedCreateMessage); + } + + // Combine the ready Futures into 1. + Future> combinedFutures = akka.dispatch.Futures.sequence( + previousTxReadyFutures.getValue(), actorContext.getActorSystem().dispatcher()); + + // Add a callback for completion of the combined Futures. + final Promise createTxPromise = akka.dispatch.Futures.promise(); + OnComplete> onComplete = new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable notUsed) { + if(failure != null) { + // A Ready Future failed so fail the returned Promise. + createTxPromise.failure(failure); + } else { + // Send the CreateTx message and use the resulting Future to complete the + // returned Promise. + createTxPromise.completeWith(actorContext.executeOperationAsync(shard, + serializedCreateMessage)); + } + } + }; + + combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); - public void waitTillCurrentTransactionReady(){ - try { - Await.result(Futures - .sequence(this.cohortFutures, actorContext.getActorSystem().dispatcher()), - actorContext.getOperationDuration()); - } catch (Exception e) { - throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e); + return createTxPromise.future(); } } }