X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=87959efe8ae2def5684e253f2e0840c7177db838;hp=93f9e6b7de1e2085f01d36c74a37b84ff7ecb4d2;hb=37f0504d391efd8b7d61403759fcc22a6dd3a093;hpb=53e56a0aae13a444cf14b20240104b77f2e02b1b diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 93f9e6b7de..87959efe8a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -10,68 +10,165 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; -import java.util.AbstractMap.SimpleEntry; +import com.google.common.base.Preconditions; +import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.Promise; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ -public class TransactionChainProxy implements DOMStoreTransactionChain{ +public class TransactionChainProxy implements DOMStoreTransactionChain { + + private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class); + + private interface State { + boolean isReady(); + + List> getPreviousReadyFutures(); + } + + private static class Allocated implements State { + private final ChainedTransactionProxy transaction; + + Allocated(ChainedTransactionProxy transaction) { + this.transaction = transaction; + } + + @Override + public boolean isReady() { + return transaction.isReady(); + } + + @Override + public List> getPreviousReadyFutures() { + return transaction.getReadyFutures(); + } + } + + private static abstract class AbstractDefaultState implements State { + @Override + public List> getPreviousReadyFutures() { + return Collections.emptyList(); + } + } + + private static final State IDLE_STATE = new AbstractDefaultState() { + @Override + public boolean isReady() { + return true; + } + }; + + private static final State CLOSED_STATE = new AbstractDefaultState() { + @Override + public boolean isReady() { + throw new TransactionChainClosedException("Transaction chain has been closed"); + } + }; + + private static final AtomicInteger counter = new AtomicInteger(0); + private final ActorContext actorContext; private final String transactionChainId; - private volatile SimpleEntry>> previousTxReadyFutures; + private volatile State currentState = IDLE_STATE; public TransactionChainProxy(ActorContext actorContext) { this.actorContext = actorContext; - transactionChainId = actorContext.getCurrentMemberName() + "-" + System.currentTimeMillis(); + transactionChainId = actorContext.getCurrentMemberName() + "-txn-chain-" + counter.incrementAndGet(); + } + + public String getTransactionChainId() { + return transactionChainId; } @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); + State localState = currentState; + checkReadyState(localState); + + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY, + transactionChainId, localState.getPreviousReadyFutures()); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE); + return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY); + return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY); } @Override public void close() { + currentState = CLOSED_STATE; + // Send a close transaction chain request to each and every shard actorContext.broadcast(new CloseTransactionChain(transactionChainId)); } - public String getTransactionChainId() { - return transactionChainId; + private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) { + State localState = currentState; + + checkReadyState(localState); + + // Pass the ready Futures from the previous Tx. + ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type, + transactionChainId, localState.getPreviousReadyFutures()); + + currentState = new Allocated(txProxy); + + return txProxy; + } + + private void checkReadyState(State state) { + Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet"); } - private class ChainedTransactionProxy extends TransactionProxy { + private static class ChainedTransactionProxy extends TransactionProxy { - ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) { + /** + * Stores the ready Futures from the previous Tx in the chain. + */ + private final List> previousReadyFutures; + + /** + * Stores the ready Futures from this transaction when it is readied. + */ + private volatile List> readyFutures; + + private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, + String transactionChainId, List> previousReadyFutures) { super(actorContext, transactionType, transactionChainId); + this.previousReadyFutures = previousReadyFutures; + } + + List> getReadyFutures() { + return readyFutures; + } + + boolean isReady() { + return readyFutures != null; } @Override - protected void onTransactionReady(List> cohortFutures) { - if(!cohortFutures.isEmpty()) { - previousTxReadyFutures = new SimpleEntry<>(getIdentifier(), cohortFutures); - } else { - previousTxReadyFutures = null; - } + protected void onTransactionReady(List> readyFutures) { + LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), + readyFutures.size(), getTransactionChainId()); + this.readyFutures = readyFutures; } /** @@ -82,18 +179,15 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{ @Override protected Future sendCreateTransaction(final ActorSelection shard, final Object serializedCreateMessage) { - // Check if there are any previous ready Futures. Also make sure the previous ready - // Futures aren't for this Tx as deadlock would occur if tried to wait on our own - // Futures. This may happen b/c the shard Tx creates are done async so it's possible - // for the client to ready this Tx before we've even attempted to create a shard Tx. - if(previousTxReadyFutures == null || - previousTxReadyFutures.getKey().equals(getIdentifier())) { + + // Check if there are any previous ready Futures, otherwise let the super class handle it. + if(previousReadyFutures.isEmpty()) { return super.sendCreateTransaction(shard, serializedCreateMessage); } // Combine the ready Futures into 1. Future> combinedFutures = akka.dispatch.Futures.sequence( - previousTxReadyFutures.getValue(), actorContext.getActorSystem().dispatcher()); + previousReadyFutures, getActorContext().getActorSystem().dispatcher()); // Add a callback for completion of the combined Futures. final Promise createTxPromise = akka.dispatch.Futures.promise(); @@ -104,15 +198,18 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{ // A Ready Future failed so fail the returned Promise. createTxPromise.failure(failure); } else { + LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}", + getIdentifier(), getTransactionChainId()); + // Send the CreateTx message and use the resulting Future to complete the // returned Promise. - createTxPromise.completeWith(actorContext.executeOperationAsync(shard, + createTxPromise.completeWith(getActorContext().executeOperationAsync(shard, serializedCreateMessage)); } } }; - combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher()); return createTxPromise.future(); }