X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=11066edd543413de08591102ba2541d7baec9a0f;hb=e2d9f9c57e124d46e117f17c44b77c89222fdb99;hp=58ac1d8b8265bc50fb7d38dea1dd9c1b916211fc;hpb=8b380afb0506b102531571a6b4d7470a7baa44a7;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 58ac1d8b82..11066edd54 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; @@ -21,18 +20,13 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.Future; -import scala.concurrent.Promise; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain { - private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class); - private interface State { boolean isReady(); @@ -139,83 +133,4 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { private void checkReadyState(State state) { Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet"); } - - private static class ChainedTransactionProxy extends TransactionProxy { - - /** - * Stores the ready Futures from the previous Tx in the chain. - */ - private final List> previousReadyFutures; - - /** - * Stores the ready Futures from this transaction when it is readied. - */ - private volatile List> readyFutures; - - private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, - String transactionChainId, List> previousReadyFutures) { - super(actorContext, transactionType, transactionChainId); - this.previousReadyFutures = previousReadyFutures; - } - - List> getReadyFutures() { - return readyFutures; - } - - boolean isReady() { - return readyFutures != null; - } - - @Override - protected void onTransactionReady(List> readyFutures) { - LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), - readyFutures.size(), getTransactionChainId()); - this.readyFutures = readyFutures; - } - - /** - * This method is overridden to ensure the previous Tx's ready operations complete - * before we initiate the next Tx in the chain to avoid creation failures if the - * previous Tx's ready operations haven't completed yet. - */ - @Override - protected Future sendFindPrimaryShardAsync(final String shardName) { - // Check if there are any previous ready Futures, otherwise let the super class handle it. - if(previousReadyFutures.isEmpty()) { - return super.sendFindPrimaryShardAsync(shardName); - } - - if(LOG.isDebugEnabled()) { - LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}", - previousReadyFutures.size(), getIdentifier(), getTransactionChainId()); - } - - // Combine the ready Futures into 1. - Future> combinedFutures = akka.dispatch.Futures.sequence( - previousReadyFutures, getActorContext().getClientDispatcher()); - - // Add a callback for completion of the combined Futures. - final Promise returnPromise = akka.dispatch.Futures.promise(); - OnComplete> onComplete = new OnComplete>() { - @Override - public void onComplete(Throwable failure, Iterable notUsed) { - if(failure != null) { - // A Ready Future failed so fail the returned Promise. - returnPromise.failure(failure); - } else { - LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}", - getIdentifier(), getTransactionChainId()); - - // Send the FindPrimaryShard message and use the resulting Future to complete the - // returned Promise. - returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName)); - } - } - }; - - combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher()); - - return returnPromise.future(); - } - } }