Merge "BUG-2560 Canonical write to remote netconf devices"
authorTony Tkacik <ttkacik@cisco.com>
Thu, 8 Jan 2015 13:34:07 +0000 (13:34 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 8 Jan 2015 13:34:07 +0000 (13:34 +0000)
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java

index 83f31b99a3de15f55fa8dcc28ae9b6c0a195ea13..abd348a9c73404b79f0866160f336bdee30aab59 100644 (file)
@@ -13,6 +13,8 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
@@ -53,15 +55,36 @@ public final class PingPongTransactionChain implements DOMTransactionChain {
     private static final Logger LOG = LoggerFactory.getLogger(PingPongTransactionChain.class);
     private final DOMTransactionChain delegate;
 
-    @GuardedBy("this")
-    private PingPongTransaction bufferTransaction;
-    @GuardedBy("this")
-    private PingPongTransaction inflightTransaction;
-    @GuardedBy("this")
-    private boolean haveLocked;
     @GuardedBy("this")
     private boolean failed;
 
+    /**
+     * This updater is used to manipulate the "ready" transaction. We perform only atomic
+     * get-and-set on it.
+     */
+    private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> READY_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "readyTx");
+    @SuppressWarnings("unused") // Accessed via READY_UPDATER
+    private volatile PingPongTransaction readyTx;
+
+    /**
+     * This updater is used to manipulate the "locked" transaction. A locked transaction
+     * means we know that the user still holds a transaction and should at some point call
+     * us. We perform on compare-and-swap to ensure we properly detect when a user is
+     * attempting to allocated multiple transactions concurrently.
+     */
+    private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> LOCKED_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "lockedTx");
+    private volatile PingPongTransaction lockedTx;
+
+    /**
+     * This updater is used to manipulate the "inflight" transaction. There can be at most
+     * one of these at any given time. We perform only compare-and-swap on these.
+     */
+    private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> INFLIGHT_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "inflightTx");
+    private volatile PingPongTransaction inflightTx;
+
     PingPongTransactionChain(final DOMDataBroker broker, final TransactionChainListener listener) {
         this.delegate = broker.createTransactionChain(new TransactionChainListener() {
             @Override
@@ -69,14 +92,15 @@ public final class PingPongTransactionChain implements DOMTransactionChain {
                 LOG.debug("Delegate chain {} reported failure in {}", chain, transaction, cause);
 
                 final DOMDataReadWriteTransaction frontend;
-                if (inflightTransaction == null) {
+                final PingPongTransaction tx = inflightTx;
+                if (tx == null) {
                     LOG.warn("Transaction chain {} failed with no pending transactions", chain);
                     frontend = null;
                 } else {
-                    frontend = inflightTransaction.getFrontendTransaction();
+                    frontend = tx.getFrontendTransaction();
                 }
 
-                listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend , cause);
+                listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend, cause);
                 delegateFailed();
             }
 
@@ -89,64 +113,99 @@ public final class PingPongTransactionChain implements DOMTransactionChain {
 
     private synchronized void delegateFailed() {
         failed = true;
-        if (!haveLocked) {
-            processBuffer();
+
+        /*
+         * If we do not have a locked transaction, we need to ensure that
+         * the backend transaction is cancelled. Otherwise we can defer
+         * until the user calls us.
+         */
+        if (lockedTx == null) {
+            processIfReady();
         }
     }
 
-    private synchronized PingPongTransaction allocateTransaction() {
-        Preconditions.checkState(!haveLocked, "Attempted to start a transaction while a previous one is still outstanding");
-        Preconditions.checkState(!failed, "Attempted to use a failed chain");
+    private synchronized PingPongTransaction slowAllocateTransaction() {
+        final DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
+        final PingPongTransaction newTx = new PingPongTransaction(delegateTx);
 
-        if (bufferTransaction == null) {
-            bufferTransaction = new PingPongTransaction(delegate.newReadWriteTransaction());
+        if (!LOCKED_UPDATER.compareAndSet(this, null, newTx)) {
+            delegateTx.cancel();
+            throw new IllegalStateException(String.format("New transaction %s raced with transacion %s", newTx, lockedTx));
         }
 
-        haveLocked = true;
-        return bufferTransaction;
+        return newTx;
     }
 
-    @GuardedBy("this")
-    private void processBuffer() {
-        final PingPongTransaction tx = bufferTransaction;
+    private PingPongTransaction allocateTransaction() {
+        // Step 1: acquire current state
+        final PingPongTransaction oldTx = READY_UPDATER.getAndSet(this, null);
 
-        if (tx != null) {
-            if (failed) {
-                LOG.debug("Cancelling transaction {}", tx);
-                tx.getTransaction().cancel();
-                bufferTransaction = null;
-                return;
-            }
+        // Slow path: allocate a delegate transaction
+        if (oldTx == null) {
+            return slowAllocateTransaction();
+        }
 
-            LOG.debug("Submitting transaction {}", tx);
-            final CheckedFuture<Void, ?> f = tx.getTransaction().submit();
-            bufferTransaction = null;
-            inflightTransaction = tx;
+        // Fast path: reuse current transaction. We will check
+        //            failures and similar on submit().
+        if (!LOCKED_UPDATER.compareAndSet(this, null, oldTx)) {
+            // Ouch. Delegate chain has not detected a duplicate
+            // transaction allocation. This is the best we can do.
+            oldTx.getTransaction().cancel();
+            throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx, lockedTx));
+        }
 
-            Futures.addCallback(f, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess(final Void result) {
-                    transactionSuccessful(tx, result);
-                }
+        return oldTx;
+    }
 
-                @Override
-                public void onFailure(final Throwable t) {
-                    transactionFailed(tx, t);
-                }
-            });
+    // This forces allocateTransaction() on a slow path
+    @GuardedBy("this")
+    private void processIfReady() {
+        final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
+        if (tx != null) {
+            processTransaction(tx);
         }
     }
 
+    /**
+     * Process a ready transaction. The caller needs to ensure that
+     * each transaction is seen only once by this method.
+     *
+     * @param tx Transaction which needs processing.
+     */
+    @GuardedBy("this")
+    private void processTransaction(final @Nonnull PingPongTransaction tx) {
+        if (failed) {
+            LOG.debug("Cancelling transaction {}", tx);
+            tx.getTransaction().cancel();
+            return;
+        }
+
+        LOG.debug("Submitting transaction {}", tx);
+        if (!INFLIGHT_UPDATER.compareAndSet(this, null, tx)) {
+            LOG.warn("Submitting transaction {} while {} is still running", tx, inflightTx);
+        }
+
+        Futures.addCallback(tx.getTransaction().submit(), new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(final Void result) {
+                transactionSuccessful(tx, result);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                transactionFailed(tx, t);
+            }
+        });
+    }
+
     private void transactionSuccessful(final PingPongTransaction tx, final Void result) {
         LOG.debug("Transaction {} completed successfully", tx);
 
-        synchronized (this) {
-            Preconditions.checkState(inflightTransaction == tx, "Successful transaction %s while %s was submitted", tx, inflightTransaction);
-            inflightTransaction = null;
+        final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null);
+        Preconditions.checkState(success, "Successful transaction %s while %s was submitted", tx, inflightTx);
 
-            if (!haveLocked) {
-                processBuffer();
-            }
+        synchronized (this) {
+            processIfReady();
         }
 
         // Can run unsynchronized
@@ -156,31 +215,34 @@ public final class PingPongTransactionChain implements DOMTransactionChain {
     private void transactionFailed(final PingPongTransaction tx, final Throwable t) {
         LOG.debug("Transaction {} failed", tx, t);
 
-        synchronized (this) {
-            Preconditions.checkState(inflightTransaction == tx, "Failed transaction %s while %s was submitted", tx, inflightTransaction);
-            inflightTransaction = null;
-        }
+        final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null);
+        Preconditions.checkState(success, "Failed transaction %s while %s was submitted", tx, inflightTx);
 
         tx.onFailure(t);
     }
 
-    private synchronized void readyTransaction(final PingPongTransaction tx) {
-        Preconditions.checkState(haveLocked, "Attempted to submit transaction while it is not outstanding");
-        Preconditions.checkState(bufferTransaction == tx, "Attempted to submit transaction %s while we have %s", tx, bufferTransaction);
+    private void readyTransaction(final @Nonnull PingPongTransaction tx) {
+        final boolean lockedMatch = LOCKED_UPDATER.compareAndSet(this, tx, null);
+        Preconditions.checkState(lockedMatch, "Attempted to submit transaction %s while we have %s", tx, lockedTx);
 
-        haveLocked = false;
-        LOG.debug("Transaction {} unlocked", bufferTransaction);
+        LOG.debug("Transaction {} unlocked", tx);
 
-        if (inflightTransaction == null) {
-            processBuffer();
+        if (inflightTx == null) {
+            synchronized (this) {
+                processTransaction(tx);
+            }
         }
     }
 
     @Override
-    public synchronized void close() {
-        Preconditions.checkState(!haveLocked, "Attempted to close chain while a transaction is outstanding");
-        processBuffer();
-        delegate.close();
+    public void close() {
+        final PingPongTransaction notLocked = lockedTx;
+        Preconditions.checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked);
+
+        synchronized (this) {
+            processIfReady();
+            delegate.close();
+        }
     }
 
     @Override