X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=93f9e6b7de1e2085f01d36c74a37b84ff7ecb4d2;hb=2ce392e287211179691e9ad9c738a6776effacd8;hp=76bbef713c350ad975c9dbb256bdef83bd1e4915;hpb=20d3e2621112fe4bf77b888d57dbdc69f2105d82;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 76bbef713c..93f9e6b7de 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -8,46 +8,113 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorSelection; +import akka.dispatch.OnComplete; +import java.util.AbstractMap.SimpleEntry; +import java.util.List; +import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Future; +import scala.concurrent.Promise; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain{ private final ActorContext actorContext; - private final SchemaContext schemaContext; + private final String transactionChainId; + private volatile SimpleEntry>> previousTxReadyFutures; - public TransactionChainProxy(ActorContext actorContext, SchemaContext schemaContext) { + public TransactionChainProxy(ActorContext actorContext) { this.actorContext = actorContext; - this.schemaContext = schemaContext; + transactionChainId = actorContext.getCurrentMemberName() + "-" + System.currentTimeMillis(); } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_ONLY, schemaContext); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.READ_WRITE, schemaContext); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new TransactionProxy(actorContext, - TransactionProxy.TransactionType.WRITE_ONLY, schemaContext); + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY); } @Override public void close() { - // FIXME : The problem here is don't know which shard the transaction chain is to be created on ??? - throw new UnsupportedOperationException("close - not sure what to do here?"); + // Send a close transaction chain request to each and every shard + actorContext.broadcast(new CloseTransactionChain(transactionChainId)); + } + + public String getTransactionChainId() { + return transactionChainId; + } + + private class ChainedTransactionProxy extends TransactionProxy { + + ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) { + super(actorContext, transactionType, transactionChainId); + } + + @Override + protected void onTransactionReady(List> cohortFutures) { + if(!cohortFutures.isEmpty()) { + previousTxReadyFutures = new SimpleEntry<>(getIdentifier(), cohortFutures); + } else { + previousTxReadyFutures = null; + } + } + + /** + * This method is overridden to ensure the previous Tx's ready operations complete + * before we create the next shard Tx in the chain to avoid creation failures if the + * previous Tx's ready operations haven't completed yet. + */ + @Override + protected Future sendCreateTransaction(final ActorSelection shard, + final Object serializedCreateMessage) { + // Check if there are any previous ready Futures. Also make sure the previous ready + // Futures aren't for this Tx as deadlock would occur if tried to wait on our own + // Futures. This may happen b/c the shard Tx creates are done async so it's possible + // for the client to ready this Tx before we've even attempted to create a shard Tx. + if(previousTxReadyFutures == null || + previousTxReadyFutures.getKey().equals(getIdentifier())) { + return super.sendCreateTransaction(shard, serializedCreateMessage); + } + + // Combine the ready Futures into 1. + Future> combinedFutures = akka.dispatch.Futures.sequence( + previousTxReadyFutures.getValue(), actorContext.getActorSystem().dispatcher()); + + // Add a callback for completion of the combined Futures. + final Promise createTxPromise = akka.dispatch.Futures.promise(); + OnComplete> onComplete = new OnComplete>() { + @Override + public void onComplete(Throwable failure, Iterable notUsed) { + if(failure != null) { + // A Ready Future failed so fail the returned Promise. + createTxPromise.failure(failure); + } else { + // Send the CreateTx message and use the resulting Future to complete the + // returned Promise. + createTxPromise.completeWith(actorContext.executeOperationAsync(shard, + serializedCreateMessage)); + } + } + }; + + combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + + return createTxPromise.future(); + } } }