X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionChainProxy.java;h=87959efe8ae2def5684e253f2e0840c7177db838;hp=2e671e3ce2f267807837225c95376847c6116eb8;hb=37f0504d391efd8b7d61403759fcc22a6dd3a093;hpb=4dd0108a91d84a7133ef6b781911e87903981bc1 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java index 2e671e3ce2..87959efe8a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java @@ -11,11 +11,9 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; import com.google.common.base.Preconditions; -import java.util.AbstractMap.SimpleEntry; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; @@ -38,39 +36,31 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { private interface State { boolean isReady(); - SimpleEntry>> getReadyFutures(); - - void setReadyFutures(Object txIdentifier, List> readyFutures); + List> getPreviousReadyFutures(); } private static class Allocated implements State { - private volatile SimpleEntry>> readyFutures; + private final ChainedTransactionProxy transaction; - @Override - public boolean isReady() { - return readyFutures != null; + Allocated(ChainedTransactionProxy transaction) { + this.transaction = transaction; } @Override - public SimpleEntry>> getReadyFutures() { - return readyFutures != null ? readyFutures : EMPTY_READY_FUTURES; + public boolean isReady() { + return transaction.isReady(); } @Override - public void setReadyFutures(Object txIdentifier, List> readyFutures) { - this.readyFutures = new SimpleEntry<>(txIdentifier, readyFutures); + public List> getPreviousReadyFutures() { + return transaction.getReadyFutures(); } } private static abstract class AbstractDefaultState implements State { @Override - public SimpleEntry>> getReadyFutures() { - return EMPTY_READY_FUTURES; - } - - @Override - public void setReadyFutures(Object txIdentifier, List> readyFutures) { - throw new IllegalStateException("No transaction is allocated"); + public List> getPreviousReadyFutures() { + return Collections.emptyList(); } } @@ -88,21 +78,15 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { } }; - private static final SimpleEntry>> EMPTY_READY_FUTURES = - new SimpleEntry>>("", - Collections.>emptyList()); - - private static final AtomicReferenceFieldUpdater STATE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "state"); + private static final AtomicInteger counter = new AtomicInteger(0); private final ActorContext actorContext; private final String transactionChainId; - private volatile State state = IDLE_STATE; - private static final AtomicInteger counter = new AtomicInteger(0); + private volatile State currentState = IDLE_STATE; public TransactionChainProxy(ActorContext actorContext) { this.actorContext = actorContext; - transactionChainId = actorContext.getCurrentMemberName() + "-transaction-chain-" + counter.incrementAndGet(); + transactionChainId = actorContext.getCurrentMemberName() + "-txn-chain-" + counter.incrementAndGet(); } public String getTransactionChainId() { @@ -111,8 +95,11 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { @Override public DOMStoreReadTransaction newReadOnlyTransaction() { - checkReadyState(); - return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY); + State localState = currentState; + checkReadyState(localState); + + return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY, + transactionChainId, localState.getPreviousReadyFutures()); } @Override @@ -127,36 +114,61 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { @Override public void close() { - state = CLOSED_STATE; + currentState = CLOSED_STATE; // Send a close transaction chain request to each and every shard actorContext.broadcast(new CloseTransactionChain(transactionChainId)); } private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) { - checkReadyState(); + State localState = currentState; + + checkReadyState(localState); - ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type); - STATE_UPDATER.compareAndSet(this, IDLE_STATE, new Allocated()); + // Pass the ready Futures from the previous Tx. + ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type, + transactionChainId, localState.getPreviousReadyFutures()); + + currentState = new Allocated(txProxy); return txProxy; } - private void checkReadyState() { - Preconditions.checkState(state.isReady(), "Previous transaction %s is not ready yet", - state.getReadyFutures().getKey()); + private void checkReadyState(State state) { + Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet"); } - private class ChainedTransactionProxy extends TransactionProxy { + private static class ChainedTransactionProxy extends TransactionProxy { + + /** + * Stores the ready Futures from the previous Tx in the chain. + */ + private final List> previousReadyFutures; + + /** + * Stores the ready Futures from this transaction when it is readied. + */ + private volatile List> readyFutures; - ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType) { + private ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType, + String transactionChainId, List> previousReadyFutures) { super(actorContext, transactionType, transactionChainId); + this.previousReadyFutures = previousReadyFutures; + } + + List> getReadyFutures() { + return readyFutures; + } + + boolean isReady() { + return readyFutures != null; } @Override protected void onTransactionReady(List> readyFutures) { - LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), readyFutures.size(), TransactionChainProxy.this.transactionChainId); - state.setReadyFutures(getIdentifier(), readyFutures); + LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(), + readyFutures.size(), getTransactionChainId()); + this.readyFutures = readyFutures; } /** @@ -169,32 +181,13 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { final Object serializedCreateMessage) { // Check if there are any previous ready Futures, otherwise let the super class handle it. - // The second check is done to ensure the the previous ready Futures aren't for this - // Tx instance as deadlock would occur if we tried to wait on our own Futures. This can - // occur in this scenario: - // - // - the TransactionProxy is created and the client does a write. - // - // - the TransactionProxy then attempts to create the shard Tx. However it first - // sends a FindPrimaryShard message to the shard manager to find the local shard - // This call is done async. - // - // - the client submits the Tx and the TransactionProxy is readied and we cache - // the ready Futures here. - // - // - then the FindPrimaryShard call completes and this method is called to create - // the shard Tx. However the cached Futures were from the ready on this Tx. If we - // tried to wait on them, it would cause a form of deadlock as the ready Future - // would be waiting on the Tx create Future and vice versa. - SimpleEntry>> readyFuturesEntry = state.getReadyFutures(); - List> readyFutures = readyFuturesEntry.getValue(); - if(readyFutures.isEmpty() || getIdentifier().equals(readyFuturesEntry.getKey())) { + if(previousReadyFutures.isEmpty()) { return super.sendCreateTransaction(shard, serializedCreateMessage); } // Combine the ready Futures into 1. Future> combinedFutures = akka.dispatch.Futures.sequence( - readyFutures, actorContext.getActorSystem().dispatcher()); + previousReadyFutures, getActorContext().getActorSystem().dispatcher()); // Add a callback for completion of the combined Futures. final Promise createTxPromise = akka.dispatch.Futures.promise(); @@ -205,15 +198,18 @@ public class TransactionChainProxy implements DOMStoreTransactionChain { // A Ready Future failed so fail the returned Promise. createTxPromise.failure(failure); } else { + LOG.debug("Previous Tx readied - sending CreateTransaction for {} on chain {}", + getIdentifier(), getTransactionChainId()); + // Send the CreateTx message and use the resulting Future to complete the // returned Promise. - createTxPromise.completeWith(actorContext.executeOperationAsync(shard, + createTxPromise.completeWith(getActorContext().executeOperationAsync(shard, serializedCreateMessage)); } } }; - combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); + combinedFutures.onComplete(onComplete, getActorContext().getActorSystem().dispatcher()); return createTxPromise.future(); }