X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FPingPongTransactionChain.java;h=c3a56ed454024b3cb316c1e5bec9d7184c897edc;hp=83f31b99a3de15f55fa8dcc28ae9b6c0a195ea13;hb=1f14b44c584f97e7c992e611e6227e262fe0089e;hpb=d70f418d19fa09b1efc8fa4ce4ed35f0cf59b73b diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java index 83f31b99a3..c3a56ed454 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java @@ -13,6 +13,8 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; @@ -53,15 +55,35 @@ public final class PingPongTransactionChain implements DOMTransactionChain { private static final Logger LOG = LoggerFactory.getLogger(PingPongTransactionChain.class); private final DOMTransactionChain delegate; - @GuardedBy("this") - private PingPongTransaction bufferTransaction; - @GuardedBy("this") - private PingPongTransaction inflightTransaction; - @GuardedBy("this") - private boolean haveLocked; @GuardedBy("this") private boolean failed; + /** + * This updater is used to manipulate the "ready" transaction. We perform only atomic + * get-and-set on it. + */ + private static final AtomicReferenceFieldUpdater READY_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "readyTx"); + private volatile PingPongTransaction readyTx; + + /** + * This updater is used to manipulate the "locked" transaction. A locked transaction + * means we know that the user still holds a transaction and should at some point call + * us. We perform on compare-and-swap to ensure we properly detect when a user is + * attempting to allocated multiple transactions concurrently. + */ + private static final AtomicReferenceFieldUpdater LOCKED_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "lockedTx"); + private volatile PingPongTransaction lockedTx; + + /** + * This updater is used to manipulate the "inflight" transaction. There can be at most + * one of these at any given time. We perform only compare-and-swap on these. + */ + private static final AtomicReferenceFieldUpdater INFLIGHT_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "inflightTx"); + private volatile PingPongTransaction inflightTx; + PingPongTransactionChain(final DOMDataBroker broker, final TransactionChainListener listener) { this.delegate = broker.createTransactionChain(new TransactionChainListener() { @Override @@ -69,14 +91,15 @@ public final class PingPongTransactionChain implements DOMTransactionChain { LOG.debug("Delegate chain {} reported failure in {}", chain, transaction, cause); final DOMDataReadWriteTransaction frontend; - if (inflightTransaction == null) { + final PingPongTransaction tx = inflightTx; + if (tx == null) { LOG.warn("Transaction chain {} failed with no pending transactions", chain); frontend = null; } else { - frontend = inflightTransaction.getFrontendTransaction(); + frontend = tx.getFrontendTransaction(); } - listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend , cause); + listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend, cause); delegateFailed(); } @@ -89,64 +112,102 @@ public final class PingPongTransactionChain implements DOMTransactionChain { private synchronized void delegateFailed() { failed = true; - if (!haveLocked) { - processBuffer(); + + /* + * If we do not have a locked transaction, we need to ensure that + * the backend transaction is cancelled. Otherwise we can defer + * until the user calls us. + */ + if (lockedTx == null) { + processIfReady(); } } - private synchronized PingPongTransaction allocateTransaction() { - Preconditions.checkState(!haveLocked, "Attempted to start a transaction while a previous one is still outstanding"); - Preconditions.checkState(!failed, "Attempted to use a failed chain"); + private synchronized PingPongTransaction slowAllocateTransaction() { + final DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction(); + final PingPongTransaction newTx = new PingPongTransaction(delegateTx); - if (bufferTransaction == null) { - bufferTransaction = new PingPongTransaction(delegate.newReadWriteTransaction()); + if (!LOCKED_UPDATER.compareAndSet(this, null, newTx)) { + delegateTx.cancel(); + throw new IllegalStateException(String.format("New transaction %s raced with transacion %s", newTx, lockedTx)); } - haveLocked = true; - return bufferTransaction; + return newTx; } - @GuardedBy("this") - private void processBuffer() { - final PingPongTransaction tx = bufferTransaction; + private PingPongTransaction allocateTransaction() { + // Step 1: acquire current state + final PingPongTransaction oldTx = READY_UPDATER.getAndSet(this, null); - if (tx != null) { - if (failed) { - LOG.debug("Cancelling transaction {}", tx); - tx.getTransaction().cancel(); - bufferTransaction = null; - return; - } + // Slow path: allocate a delegate transaction + if (oldTx == null) { + return slowAllocateTransaction(); + } - LOG.debug("Submitting transaction {}", tx); - final CheckedFuture f = tx.getTransaction().submit(); - bufferTransaction = null; - inflightTransaction = tx; + // Fast path: reuse current transaction. We will check + // failures and similar on submit(). + if (!LOCKED_UPDATER.compareAndSet(this, null, oldTx)) { + // Ouch. Delegate chain has not detected a duplicate + // transaction allocation. This is the best we can do. + oldTx.getTransaction().cancel(); + throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx, lockedTx)); + } - Futures.addCallback(f, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - transactionSuccessful(tx, result); - } + return oldTx; + } - @Override - public void onFailure(final Throwable t) { - transactionFailed(tx, t); - } - }); + /* + * This forces allocateTransaction() on a slow path, which has to happen after + * this method has completed executing. + */ + @GuardedBy("this") + private void processIfReady() { + final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null); + if (tx != null) { + processTransaction(tx); } } + /** + * Process a ready transaction. The caller needs to ensure that + * each transaction is seen only once by this method. + * + * @param tx Transaction which needs processing. + */ + @GuardedBy("this") + private void processTransaction(final @Nonnull PingPongTransaction tx) { + if (failed) { + LOG.debug("Cancelling transaction {}", tx); + tx.getTransaction().cancel(); + return; + } + + LOG.debug("Submitting transaction {}", tx); + if (!INFLIGHT_UPDATER.compareAndSet(this, null, tx)) { + LOG.warn("Submitting transaction {} while {} is still running", tx, inflightTx); + } + + Futures.addCallback(tx.getTransaction().submit(), new FutureCallback() { + @Override + public void onSuccess(final Void result) { + transactionSuccessful(tx, result); + } + + @Override + public void onFailure(final Throwable t) { + transactionFailed(tx, t); + } + }); + } + private void transactionSuccessful(final PingPongTransaction tx, final Void result) { LOG.debug("Transaction {} completed successfully", tx); - synchronized (this) { - Preconditions.checkState(inflightTransaction == tx, "Successful transaction %s while %s was submitted", tx, inflightTransaction); - inflightTransaction = null; + final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null); + Preconditions.checkState(success, "Successful transaction %s while %s was submitted", tx, inflightTx); - if (!haveLocked) { - processBuffer(); - } + synchronized (this) { + processIfReady(); } // Can run unsynchronized @@ -156,31 +217,48 @@ public final class PingPongTransactionChain implements DOMTransactionChain { private void transactionFailed(final PingPongTransaction tx, final Throwable t) { LOG.debug("Transaction {} failed", tx, t); - synchronized (this) { - Preconditions.checkState(inflightTransaction == tx, "Failed transaction %s while %s was submitted", tx, inflightTransaction); - inflightTransaction = null; - } + final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null); + Preconditions.checkState(success, "Failed transaction %s while %s was submitted", tx, inflightTx); tx.onFailure(t); } - private synchronized void readyTransaction(final PingPongTransaction tx) { - Preconditions.checkState(haveLocked, "Attempted to submit transaction while it is not outstanding"); - Preconditions.checkState(bufferTransaction == tx, "Attempted to submit transaction %s while we have %s", tx, bufferTransaction); - - haveLocked = false; - LOG.debug("Transaction {} unlocked", bufferTransaction); - - if (inflightTransaction == null) { - processBuffer(); + private void readyTransaction(final @Nonnull PingPongTransaction tx) { + // First mark the transaction as not locked. + final boolean lockedMatch = LOCKED_UPDATER.compareAndSet(this, tx, null); + Preconditions.checkState(lockedMatch, "Attempted to submit transaction %s while we have %s", tx, lockedTx); + LOG.debug("Transaction {} unlocked", tx); + + /* + * The transaction is ready. It will then be picked up by either next allocation, + * or a background transaction completion callback. + */ + final boolean success = READY_UPDATER.compareAndSet(this, null, tx); + Preconditions.checkState(success, "Transaction %s collided on ready state", tx, readyTx); + LOG.debug("Transaction {} readied"); + + /* + * We do not see a transaction being in-flight, so we need to take care of dispatching + * the transaction to the backend. We are in the ready case, we cannot short-cut + * the checking of readyTx, as an in-flight transaction may have completed between us + * setting the field above and us checking. + */ + if (inflightTx == null) { + synchronized (this) { + processIfReady(); + } } } @Override - public synchronized void close() { - Preconditions.checkState(!haveLocked, "Attempted to close chain while a transaction is outstanding"); - processBuffer(); - delegate.close(); + public void close() { + final PingPongTransaction notLocked = lockedTx; + Preconditions.checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked); + + synchronized (this) { + processIfReady(); + delegate.close(); + } } @Override