From: Robert Varga Date: Mon, 24 Nov 2014 23:04:34 +0000 (+0100) Subject: Remove lock out of the congested/reuse case X-Git-Tag: release/lithium~717^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=ea292acfe7396e1813f7cc56f6002d2b59ee18c2 Remove lock out of the congested/reuse case Reusing a transaction is a simple state transition inside the chain, which helps the system throughput, since more changes will be packed in a single transaction. This patch optimizes this reuse as a fast path which is performed without taking the lock on the assumption that this will allow better code movement. Change-Id: Idd8ba65c0d45c7ceea42d50ea3a6a521e0123733 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java index 83f31b99a3..b3c03b3185 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java @@ -13,6 +13,8 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; @@ -53,15 +55,30 @@ public final class PingPongTransactionChain implements DOMTransactionChain { private static final Logger LOG = LoggerFactory.getLogger(PingPongTransactionChain.class); private final DOMTransactionChain delegate; - @GuardedBy("this") - private PingPongTransaction bufferTransaction; @GuardedBy("this") private PingPongTransaction inflightTransaction; @GuardedBy("this") - private boolean haveLocked; - @GuardedBy("this") private boolean failed; + /** + * This updater is used to manipulate the "ready" transaction. We perform only atomic + * get-and-set on it. + */ + private static final AtomicReferenceFieldUpdater READY_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "readyTx"); + @SuppressWarnings("unused") // Accessed via READY_UPDATER + private volatile PingPongTransaction readyTx; + + /** + * This updater is used to manipulate the "locked" transaction. A locked transaction + * means we know that the user still holds a transaction and should at some point call + * us. We perform on compare-and-swap to ensure we properly detect when a user is + * attempting to allocated multiple transactions concurrently. + */ + private static final AtomicReferenceFieldUpdater LOCKED_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "lockedTx"); + private volatile PingPongTransaction lockedTx; + PingPongTransactionChain(final DOMDataBroker broker, final TransactionChainListener listener) { this.delegate = broker.createTransactionChain(new TransactionChainListener() { @Override @@ -89,52 +106,88 @@ public final class PingPongTransactionChain implements DOMTransactionChain { private synchronized void delegateFailed() { failed = true; - if (!haveLocked) { - processBuffer(); + + /* + * If we do not have a locked transaction, we need to ensure that + * the backend transaction is cancelled. Otherwise we can defer + * until the user calls us. + */ + if (lockedTx == null) { + processIfReady(); } } - private synchronized PingPongTransaction allocateTransaction() { - Preconditions.checkState(!haveLocked, "Attempted to start a transaction while a previous one is still outstanding"); - Preconditions.checkState(!failed, "Attempted to use a failed chain"); + private synchronized PingPongTransaction slowAllocateTransaction() { + final DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction(); + final PingPongTransaction newTx = new PingPongTransaction(delegateTx); - if (bufferTransaction == null) { - bufferTransaction = new PingPongTransaction(delegate.newReadWriteTransaction()); + if (!LOCKED_UPDATER.compareAndSet(this, null, newTx)) { + delegateTx.cancel(); + throw new IllegalStateException(String.format("New transaction %s raced with transacion %s", newTx, lockedTx)); } - haveLocked = true; - return bufferTransaction; + return newTx; } - @GuardedBy("this") - private void processBuffer() { - final PingPongTransaction tx = bufferTransaction; + private PingPongTransaction allocateTransaction() { + // Step 1: acquire current state + final PingPongTransaction oldTx = READY_UPDATER.getAndSet(this, null); - if (tx != null) { - if (failed) { - LOG.debug("Cancelling transaction {}", tx); - tx.getTransaction().cancel(); - bufferTransaction = null; - return; - } + // Slow path: allocate a delegate transaction + if (oldTx == null) { + return slowAllocateTransaction(); + } - LOG.debug("Submitting transaction {}", tx); - final CheckedFuture f = tx.getTransaction().submit(); - bufferTransaction = null; - inflightTransaction = tx; + // Fast path: reuse current transaction. We will check + // failures and similar on submit(). + if (!LOCKED_UPDATER.compareAndSet(this, null, oldTx)) { + // Ouch. Delegate chain has not detected a duplicate + // transaction allocation. This is the best we can do. + oldTx.getTransaction().cancel(); + throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx, lockedTx)); + } - Futures.addCallback(f, new FutureCallback() { - @Override - public void onSuccess(final Void result) { - transactionSuccessful(tx, result); - } + return oldTx; + } - @Override - public void onFailure(final Throwable t) { - transactionFailed(tx, t); - } - }); + // This forces allocateTransaction() on a slow path + @GuardedBy("this") + private void processIfReady() { + final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null); + if (tx != null) { + processTransaction(tx); + } + } + + /** + * Process a ready transaction. The caller needs to ensure that + * each transaction is seen only once by this method. + * + * @param tx Transaction which needs processing. + */ + @GuardedBy("this") + private void processTransaction(final @Nonnull PingPongTransaction tx) { + if (failed) { + LOG.debug("Cancelling transaction {}", tx); + tx.getTransaction().cancel(); + return; } + + LOG.debug("Submitting transaction {}", tx); + final CheckedFuture f = tx.getTransaction().submit(); + inflightTransaction = tx; + + Futures.addCallback(f, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + transactionSuccessful(tx, result); + } + + @Override + public void onFailure(final Throwable t) { + transactionFailed(tx, t); + } + }); } private void transactionSuccessful(final PingPongTransaction tx, final Void result) { @@ -142,11 +195,9 @@ public final class PingPongTransactionChain implements DOMTransactionChain { synchronized (this) { Preconditions.checkState(inflightTransaction == tx, "Successful transaction %s while %s was submitted", tx, inflightTransaction); - inflightTransaction = null; - if (!haveLocked) { - processBuffer(); - } + inflightTransaction = null; + processIfReady(); } // Can run unsynchronized @@ -164,23 +215,28 @@ public final class PingPongTransactionChain implements DOMTransactionChain { tx.onFailure(t); } - private synchronized void readyTransaction(final PingPongTransaction tx) { - Preconditions.checkState(haveLocked, "Attempted to submit transaction while it is not outstanding"); - Preconditions.checkState(bufferTransaction == tx, "Attempted to submit transaction %s while we have %s", tx, bufferTransaction); + private void readyTransaction(final @Nonnull PingPongTransaction tx) { + final boolean lockedMatch = LOCKED_UPDATER.compareAndSet(this, tx, null); + Preconditions.checkState(lockedMatch, "Attempted to submit transaction %s while we have %s", tx, lockedTx); - haveLocked = false; - LOG.debug("Transaction {} unlocked", bufferTransaction); + LOG.debug("Transaction {} unlocked", tx); - if (inflightTransaction == null) { - processBuffer(); + synchronized (this) { + if (inflightTransaction == null) { + processTransaction(tx); + } } } @Override - public synchronized void close() { - Preconditions.checkState(!haveLocked, "Attempted to close chain while a transaction is outstanding"); - processBuffer(); - delegate.close(); + public void close() { + final PingPongTransaction notLocked = lockedTx; + Preconditions.checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked); + + synchronized (this) { + processIfReady(); + delegate.close(); + } } @Override