X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=cf261cbd2af103b70f3dfce9885903a88ee293e6;hp=58ac1d8b8265bc50fb7d38dea1dd9c1b916211fc;hb=107324809285bfbb9890cba38ffa18390f8de4bd;hpb=874a18a9ce5dc09bc49922754bf8fb3e981fffb9 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 58ac1d8b82..cf261cbd2a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -8,8 +8,6 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorSelection; -import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; import java.util.Collections; import java.util.List; @@ -21,22 +19,17 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.Future; -import scala.concurrent.Promise; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain { - private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class); - private interface State { boolean isReady(); - List> getPreviousReadyFutures(); + List> getPreviousReadyFutures(); } private static class Allocated implements State { @@ -52,14 +45,14 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { } @Override - public List> getPreviousReadyFutures() { + public List> getPreviousReadyFutures() { return transaction.getReadyFutures(); } } private static abstract class AbstractDefaultState implements State { @Override - public List> getPreviousReadyFutures() { + public List> getPreviousReadyFutures() { return Collections.emptyList(); } } @@ -119,7 +112,7 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { currentState = CLOSED_STATE; // Send a close transaction chain request to each and every shard - actorContext.broadcast(new CloseTransactionChain(transactionChainId)); + actorContext.broadcast(new CloseTransactionChain(transactionChainId).toSerializable()); } private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) { @@ -139,83 +132,4 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { private void checkReadyState(State state) { Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet"); } - - private static class ChainedTransactionProxy extends TransactionProxy { - - /** - * Stores the ready Futures from the previous Tx in the chain. - */ - private final List> previousReadyFutures; - - /** - * Stores the ready Futures from this transaction when it is readied. - */ - private volatile List> readyFutures; - - private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, - String transactionChainId, List> previousReadyFutures) { - super(actorContext, transactionType, transactionChainId); - this.previousReadyFutures = previousReadyFutures; - } - - List> getReadyFutures() { - return readyFutures; - } - - boolean isReady() { - return readyFutures != null; - } - - @Override - protected void onTransactionReady(List> readyFutures) { - LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), - readyFutures.size(), getTransactionChainId()); - this.readyFutures = readyFutures; - } - - /** - * This method is overridden to ensure the previous Tx's ready operations complete - * before we initiate the next Tx in the chain to avoid creation failures if the - * previous Tx's ready operations haven't completed yet. - */ - @Override - protected Future sendFindPrimaryShardAsync(final String shardName) { - // Check if there are any previous ready Futures, otherwise let the super class handle it. - if(previousReadyFutures.isEmpty()) { - return super.sendFindPrimaryShardAsync(shardName); - } - - if(LOG.isDebugEnabled()) { - LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}", - previousReadyFutures.size(), getIdentifier(), getTransactionChainId()); - } - - // Combine the ready Futures into 1. - Future> combinedFutures = akka.dispatch.Futures.sequence( - previousReadyFutures, getActorContext().getClientDispatcher()); - - // Add a callback for completion of the combined Futures. - final Promise returnPromise = akka.dispatch.Futures.promise(); - OnComplete> onComplete = new OnComplete>() { - @Override - public void onComplete(Throwable failure, Iterable notUsed) { - if(failure != null) { - // A Ready Future failed so fail the returned Promise. - returnPromise.failure(failure); - } else { - LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}", - getIdentifier(), getTransactionChainId()); - - // Send the FindPrimaryShard message and use the resulting Future to complete the - // returned Promise. - returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName)); - } - } - }; - - combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher()); - - return returnPromise.future(); - } - } }