import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
private static final Logger LOG = LoggerFactory.getLogger(PingPongTransactionChain.class);
private final DOMTransactionChain delegate;
- @GuardedBy("this")
- private PingPongTransaction bufferTransaction;
- @GuardedBy("this")
- private PingPongTransaction inflightTransaction;
- @GuardedBy("this")
- private boolean haveLocked;
@GuardedBy("this")
private boolean failed;
+ /**
+ * This updater is used to manipulate the "ready" transaction. We perform only atomic
+ * get-and-set on it.
+ */
+ private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> READY_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "readyTx");
+ private volatile PingPongTransaction readyTx;
+
+ /**
+ * This updater is used to manipulate the "locked" transaction. A locked transaction
+ * means we know that the user still holds a transaction and should at some point call
+ * us. We perform on compare-and-swap to ensure we properly detect when a user is
+ * attempting to allocated multiple transactions concurrently.
+ */
+ private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> LOCKED_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "lockedTx");
+ private volatile PingPongTransaction lockedTx;
+
+ /**
+ * This updater is used to manipulate the "inflight" transaction. There can be at most
+ * one of these at any given time. We perform only compare-and-swap on these.
+ */
+ private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> INFLIGHT_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "inflightTx");
+ private volatile PingPongTransaction inflightTx;
+
PingPongTransactionChain(final DOMDataBroker broker, final TransactionChainListener listener) {
this.delegate = broker.createTransactionChain(new TransactionChainListener() {
@Override
LOG.debug("Delegate chain {} reported failure in {}", chain, transaction, cause);
final DOMDataReadWriteTransaction frontend;
- if (inflightTransaction == null) {
+ final PingPongTransaction tx = inflightTx;
+ if (tx == null) {
LOG.warn("Transaction chain {} failed with no pending transactions", chain);
frontend = null;
} else {
- frontend = inflightTransaction.getFrontendTransaction();
+ frontend = tx.getFrontendTransaction();
}
- listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend , cause);
+ listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend, cause);
delegateFailed();
}
private synchronized void delegateFailed() {
failed = true;
- if (!haveLocked) {
- processBuffer();
+
+ /*
+ * If we do not have a locked transaction, we need to ensure that
+ * the backend transaction is cancelled. Otherwise we can defer
+ * until the user calls us.
+ */
+ if (lockedTx == null) {
+ processIfReady();
}
}
- private synchronized PingPongTransaction allocateTransaction() {
- Preconditions.checkState(!haveLocked, "Attempted to start a transaction while a previous one is still outstanding");
- Preconditions.checkState(!failed, "Attempted to use a failed chain");
+ private synchronized PingPongTransaction slowAllocateTransaction() {
+ final DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
+ final PingPongTransaction newTx = new PingPongTransaction(delegateTx);
- if (bufferTransaction == null) {
- bufferTransaction = new PingPongTransaction(delegate.newReadWriteTransaction());
+ if (!LOCKED_UPDATER.compareAndSet(this, null, newTx)) {
+ delegateTx.cancel();
+ throw new IllegalStateException(String.format("New transaction %s raced with transacion %s", newTx, lockedTx));
}
- haveLocked = true;
- return bufferTransaction;
+ return newTx;
}
- @GuardedBy("this")
- private void processBuffer() {
- final PingPongTransaction tx = bufferTransaction;
+ private PingPongTransaction allocateTransaction() {
+ // Step 1: acquire current state
+ final PingPongTransaction oldTx = READY_UPDATER.getAndSet(this, null);
- if (tx != null) {
- if (failed) {
- LOG.debug("Cancelling transaction {}", tx);
- tx.getTransaction().cancel();
- bufferTransaction = null;
- return;
- }
+ // Slow path: allocate a delegate transaction
+ if (oldTx == null) {
+ return slowAllocateTransaction();
+ }
- LOG.debug("Submitting transaction {}", tx);
- final CheckedFuture<Void, ?> f = tx.getTransaction().submit();
- bufferTransaction = null;
- inflightTransaction = tx;
+ // Fast path: reuse current transaction. We will check
+ // failures and similar on submit().
+ if (!LOCKED_UPDATER.compareAndSet(this, null, oldTx)) {
+ // Ouch. Delegate chain has not detected a duplicate
+ // transaction allocation. This is the best we can do.
+ oldTx.getTransaction().cancel();
+ throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx, lockedTx));
+ }
- Futures.addCallback(f, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- transactionSuccessful(tx, result);
- }
+ return oldTx;
+ }
- @Override
- public void onFailure(final Throwable t) {
- transactionFailed(tx, t);
- }
- });
+ /*
+ * This forces allocateTransaction() on a slow path, which has to happen after
+ * this method has completed executing.
+ */
+ @GuardedBy("this")
+ private void processIfReady() {
+ final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
+ if (tx != null) {
+ processTransaction(tx);
}
}
+ /**
+ * Process a ready transaction. The caller needs to ensure that
+ * each transaction is seen only once by this method.
+ *
+ * @param tx Transaction which needs processing.
+ */
+ @GuardedBy("this")
+ private void processTransaction(final @Nonnull PingPongTransaction tx) {
+ if (failed) {
+ LOG.debug("Cancelling transaction {}", tx);
+ tx.getTransaction().cancel();
+ return;
+ }
+
+ LOG.debug("Submitting transaction {}", tx);
+ if (!INFLIGHT_UPDATER.compareAndSet(this, null, tx)) {
+ LOG.warn("Submitting transaction {} while {} is still running", tx, inflightTx);
+ }
+
+ Futures.addCallback(tx.getTransaction().submit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ transactionSuccessful(tx, result);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ transactionFailed(tx, t);
+ }
+ });
+ }
+
private void transactionSuccessful(final PingPongTransaction tx, final Void result) {
LOG.debug("Transaction {} completed successfully", tx);
- synchronized (this) {
- Preconditions.checkState(inflightTransaction == tx, "Successful transaction %s while %s was submitted", tx, inflightTransaction);
- inflightTransaction = null;
+ final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null);
+ Preconditions.checkState(success, "Successful transaction %s while %s was submitted", tx, inflightTx);
- if (!haveLocked) {
- processBuffer();
- }
+ synchronized (this) {
+ processIfReady();
}
// Can run unsynchronized
private void transactionFailed(final PingPongTransaction tx, final Throwable t) {
LOG.debug("Transaction {} failed", tx, t);
- synchronized (this) {
- Preconditions.checkState(inflightTransaction == tx, "Failed transaction %s while %s was submitted", tx, inflightTransaction);
- inflightTransaction = null;
- }
+ final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null);
+ Preconditions.checkState(success, "Failed transaction %s while %s was submitted", tx, inflightTx);
tx.onFailure(t);
}
- private synchronized void readyTransaction(final PingPongTransaction tx) {
- Preconditions.checkState(haveLocked, "Attempted to submit transaction while it is not outstanding");
- Preconditions.checkState(bufferTransaction == tx, "Attempted to submit transaction %s while we have %s", tx, bufferTransaction);
-
- haveLocked = false;
- LOG.debug("Transaction {} unlocked", bufferTransaction);
-
- if (inflightTransaction == null) {
- processBuffer();
+ private void readyTransaction(final @Nonnull PingPongTransaction tx) {
+ // First mark the transaction as not locked.
+ final boolean lockedMatch = LOCKED_UPDATER.compareAndSet(this, tx, null);
+ Preconditions.checkState(lockedMatch, "Attempted to submit transaction %s while we have %s", tx, lockedTx);
+ LOG.debug("Transaction {} unlocked", tx);
+
+ /*
+ * The transaction is ready. It will then be picked up by either next allocation,
+ * or a background transaction completion callback.
+ */
+ final boolean success = READY_UPDATER.compareAndSet(this, null, tx);
+ Preconditions.checkState(success, "Transaction %s collided on ready state", tx, readyTx);
+ LOG.debug("Transaction {} readied", tx);
+
+ /*
+ * We do not see a transaction being in-flight, so we need to take care of dispatching
+ * the transaction to the backend. We are in the ready case, we cannot short-cut
+ * the checking of readyTx, as an in-flight transaction may have completed between us
+ * setting the field above and us checking.
+ */
+ if (inflightTx == null) {
+ synchronized (this) {
+ processIfReady();
+ }
}
}
@Override
- public synchronized void close() {
- Preconditions.checkState(!haveLocked, "Attempted to close chain while a transaction is outstanding");
- processBuffer();
- delegate.close();
+ public void close() {
+ final PingPongTransaction notLocked = lockedTx;
+ Preconditions.checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked);
+
+ synchronized (this) {
+ processIfReady();
+ delegate.close();
+ }
}
@Override