X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=cf261cbd2af103b70f3dfce9885903a88ee293e6;hp=2e671e3ce2f267807837225c95376847c6116eb8;hb=107324809285bfbb9890cba38ffa18390f8de4bd;hpb=4dd0108a91d84a7133ef6b781911e87903981bc1 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 2e671e3ce2..cf261cbd2a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -8,14 +8,10 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorSelection; -import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; -import java.util.AbstractMap.SimpleEntry; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; @@ -23,54 +19,41 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.Future; -import scala.concurrent.Promise; /** * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard */ public class TransactionChainProxy implements DOMStoreTransactionChain { - private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class); - private interface State { boolean isReady(); - SimpleEntry>> getReadyFutures(); - - void setReadyFutures(Object txIdentifier, List> readyFutures); + List> getPreviousReadyFutures(); } private static class Allocated implements State { - private volatile SimpleEntry>> readyFutures; + private final ChainedTransactionProxy transaction; - @Override - public boolean isReady() { - return readyFutures != null; + Allocated(ChainedTransactionProxy transaction) { + this.transaction = transaction; } @Override - public SimpleEntry>> getReadyFutures() { - return readyFutures != null ? readyFutures : EMPTY_READY_FUTURES; + public boolean isReady() { + return transaction.isReady(); } @Override - public void setReadyFutures(Object txIdentifier, List> readyFutures) { - this.readyFutures = new SimpleEntry<>(txIdentifier, readyFutures); + public List> getPreviousReadyFutures() { + return transaction.getReadyFutures(); } } private static abstract class AbstractDefaultState implements State { @Override - public SimpleEntry>> getReadyFutures() { - return EMPTY_READY_FUTURES; - } - - @Override - public void setReadyFutures(Object txIdentifier, List> readyFutures) { - throw new IllegalStateException("No transaction is allocated"); + public List> getPreviousReadyFutures() { + return Collections.emptyList(); } } @@ -88,21 +71,15 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { } }; - private static final SimpleEntry>> EMPTY_READY_FUTURES = - new SimpleEntry>>("", - Collections.>emptyList()); - - private static final AtomicReferenceFieldUpdater STATE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "state"); + private static final AtomicInteger counter = new AtomicInteger(0); private final ActorContext actorContext; private final String transactionChainId; - private volatile State state = IDLE_STATE; - private static final AtomicInteger counter = new AtomicInteger(0); + private volatile State currentState = IDLE_STATE; public TransactionChainProxy(ActorContext actorContext) { this.actorContext = actorContext; - transactionChainId = actorContext.getCurrentMemberName() + "-transaction-chain-" + counter.incrementAndGet(); + transactionChainId = actorContext.getCurrentMemberName() + "-txn-chain-" + counter.incrementAndGet(); } public String getTransactionChainId() { @@ -111,111 +88,48 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - checkReadyState(); - return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); + State localState = currentState; + checkReadyState(localState); + + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY, + transactionChainId, localState.getPreviousReadyFutures()); } @Override public DOMStoreReadWriteTransaction newReadWriteTransaction() { + actorContext.acquireTxCreationPermit(); return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE); } @Override public DOMStoreWriteTransaction newWriteOnlyTransaction() { + actorContext.acquireTxCreationPermit(); return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY); } @Override public void close() { - state = CLOSED_STATE; + currentState = CLOSED_STATE; // Send a close transaction chain request to each and every shard - actorContext.broadcast(new CloseTransactionChain(transactionChainId)); + actorContext.broadcast(new CloseTransactionChain(transactionChainId).toSerializable()); } private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) { - checkReadyState(); + State localState = currentState; - ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type); - STATE_UPDATER.compareAndSet(this, IDLE_STATE, new Allocated()); + checkReadyState(localState); - return txProxy; - } - - private void checkReadyState() { - Preconditions.checkState(state.isReady(), "Previous transaction %s is not ready yet", - state.getReadyFutures().getKey()); - } - - private class ChainedTransactionProxy extends TransactionProxy { + // Pass the ready Futures from the previous Tx. + ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type, + transactionChainId, localState.getPreviousReadyFutures()); - ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) { - super(actorContext, transactionType, transactionChainId); - } + currentState = new Allocated(txProxy); - @Override - protected void onTransactionReady(List> readyFutures) { - LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), readyFutures.size(), TransactionChainProxy.this.transactionChainId); - state.setReadyFutures(getIdentifier(), readyFutures); - } + return txProxy; + } - /** - * This method is overridden to ensure the previous Tx's ready operations complete - * before we create the next shard Tx in the chain to avoid creation failures if the - * previous Tx's ready operations haven't completed yet. - */ - @Override - protected Future sendCreateTransaction(final ActorSelection shard, - final Object serializedCreateMessage) { - - // Check if there are any previous ready Futures, otherwise let the super class handle it. - // The second check is done to ensure the the previous ready Futures aren't for this - // Tx instance as deadlock would occur if we tried to wait on our own Futures. This can - // occur in this scenario: - // - // - the TransactionProxy is created and the client does a write. - // - // - the TransactionProxy then attempts to create the shard Tx. However it first - // sends a FindPrimaryShard message to the shard manager to find the local shard - // This call is done async. - // - // - the client submits the Tx and the TransactionProxy is readied and we cache - // the ready Futures here. - // - // - then the FindPrimaryShard call completes and this method is called to create - // the shard Tx. However the cached Futures were from the ready on this Tx. If we - // tried to wait on them, it would cause a form of deadlock as the ready Future - // would be waiting on the Tx create Future and vice versa. - SimpleEntry>> readyFuturesEntry = state.getReadyFutures(); - List> readyFutures = readyFuturesEntry.getValue(); - if(readyFutures.isEmpty() || getIdentifier().equals(readyFuturesEntry.getKey())) { - return super.sendCreateTransaction(shard, serializedCreateMessage); - } - - // Combine the ready Futures into 1. - Future> combinedFutures = akka.dispatch.Futures.sequence( - readyFutures, actorContext.getActorSystem().dispatcher()); - - // Add a callback for completion of the combined Futures. - final Promise createTxPromise = akka.dispatch.Futures.promise(); - OnComplete> onComplete = new OnComplete>() { - @Override - public void onComplete(Throwable failure, Iterable notUsed) { - if(failure != null) { - // A Ready Future failed so fail the returned Promise. - createTxPromise.failure(failure); - } else { - // Send the CreateTx message and use the resulting Future to complete the - // returned Promise. - createTxPromise.completeWith(actorContext.executeOperationAsync(shard, - serializedCreateMessage)); - } - } - }; - - combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); - - return createTxPromise.future(); - } + private void checkReadyState(State state) { + Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet"); } }