private static final Logger LOG = LoggerFactory.getLogger(PingPongTransactionChain.class);
private final DOMTransactionChain delegate;
- @GuardedBy("this")
- private PingPongTransaction inflightTransaction;
@GuardedBy("this")
private boolean failed;
+ @GuardedBy("this")
+ private PingPongTransaction shutdownTx;
/**
* This updater is used to manipulate the "ready" transaction. We perform only atomic
*/
private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> READY_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "readyTx");
- @SuppressWarnings("unused") // Accessed via READY_UPDATER
private volatile PingPongTransaction readyTx;
/**
AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "lockedTx");
private volatile PingPongTransaction lockedTx;
+ /**
+ * This updater is used to manipulate the "inflight" transaction. There can be at most
+ * one of these at any given time. We perform only compare-and-swap on these.
+ */
+ private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> INFLIGHT_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "inflightTx");
+ private volatile PingPongTransaction inflightTx;
+
PingPongTransactionChain(final DOMDataBroker broker, final TransactionChainListener listener) {
this.delegate = broker.createTransactionChain(new TransactionChainListener() {
@Override
LOG.debug("Delegate chain {} reported failure in {}", chain, transaction, cause);
final DOMDataReadWriteTransaction frontend;
- if (inflightTransaction == null) {
+ final PingPongTransaction tx = inflightTx;
+ if (tx == null) {
LOG.warn("Transaction chain {} failed with no pending transactions", chain);
frontend = null;
} else {
- frontend = inflightTransaction.getFrontendTransaction();
+ frontend = tx.getFrontendTransaction();
}
- listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend , cause);
+ listener.onTransactionChainFailed(PingPongTransactionChain.this, frontend, cause);
delegateFailed();
}
}
private synchronized PingPongTransaction slowAllocateTransaction() {
+ Preconditions.checkState(shutdownTx == null, "Transaction chain %s has been shut down", this);
+
final DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
final PingPongTransaction newTx = new PingPongTransaction(delegateTx);
return oldTx;
}
- // This forces allocateTransaction() on a slow path
+ /*
+ * This forces allocateTransaction() on a slow path, which has to happen after
+ * this method has completed executing. Also inflightTx may be updated outside
+ * the lock, hence we need to re-check.
+ */
@GuardedBy("this")
private void processIfReady() {
- final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
- if (tx != null) {
- processTransaction(tx);
+ if (inflightTx == null) {
+ final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
+ if (tx != null) {
+ processTransaction(tx);
+ }
}
}
* @param tx Transaction which needs processing.
*/
@GuardedBy("this")
- private void processTransaction(final @Nonnull PingPongTransaction tx) {
+ private void processTransaction(@Nonnull final PingPongTransaction tx) {
if (failed) {
LOG.debug("Cancelling transaction {}", tx);
tx.getTransaction().cancel();
}
LOG.debug("Submitting transaction {}", tx);
- final CheckedFuture<Void, ?> f = tx.getTransaction().submit();
- inflightTransaction = tx;
+ if (!INFLIGHT_UPDATER.compareAndSet(this, null, tx)) {
+ LOG.warn("Submitting transaction {} while {} is still running", tx, inflightTx);
+ }
- Futures.addCallback(f, new FutureCallback<Void>() {
+ Futures.addCallback(tx.getTransaction().submit(), new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
transactionSuccessful(tx, result);
});
}
- private void transactionSuccessful(final PingPongTransaction tx, final Void result) {
- LOG.debug("Transaction {} completed successfully", tx);
+ private void processNextTransaction(final PingPongTransaction tx) {
+ final boolean success = INFLIGHT_UPDATER.compareAndSet(this, tx, null);
+ Preconditions.checkState(success, "Completed transaction %s while %s was submitted", tx, inflightTx);
synchronized (this) {
- Preconditions.checkState(inflightTransaction == tx, "Successful transaction %s while %s was submitted", tx, inflightTransaction);
-
- inflightTransaction = null;
- processIfReady();
+ final PingPongTransaction nextTx = READY_UPDATER.getAndSet(this, null);
+ if (nextTx != null) {
+ processTransaction(nextTx);
+ } else if (shutdownTx != null) {
+ processTransaction(shutdownTx);
+ delegate.close();
+ shutdownTx = null;
+ }
}
+ }
+
+ private void transactionSuccessful(final PingPongTransaction tx, final Void result) {
+ LOG.debug("Transaction {} completed successfully", tx);
- // Can run unsynchronized
tx.onSuccess(result);
+ processNextTransaction(tx);
}
private void transactionFailed(final PingPongTransaction tx, final Throwable t) {
LOG.debug("Transaction {} failed", tx, t);
- synchronized (this) {
- Preconditions.checkState(inflightTransaction == tx, "Failed transaction %s while %s was submitted", tx, inflightTransaction);
- inflightTransaction = null;
- }
-
tx.onFailure(t);
+ processNextTransaction(tx);
}
- private void readyTransaction(final @Nonnull PingPongTransaction tx) {
+ private void readyTransaction(@Nonnull final PingPongTransaction tx) {
+ // First mark the transaction as not locked.
final boolean lockedMatch = LOCKED_UPDATER.compareAndSet(this, tx, null);
Preconditions.checkState(lockedMatch, "Attempted to submit transaction %s while we have %s", tx, lockedTx);
-
LOG.debug("Transaction {} unlocked", tx);
- synchronized (this) {
- if (inflightTransaction == null) {
- processTransaction(tx);
+ /*
+ * The transaction is ready. It will then be picked up by either next allocation,
+ * or a background transaction completion callback.
+ */
+ final boolean success = READY_UPDATER.compareAndSet(this, null, tx);
+ Preconditions.checkState(success, "Transaction %s collided on ready state", tx, readyTx);
+ LOG.debug("Transaction {} readied", tx);
+
+ /*
+ * We do not see a transaction being in-flight, so we need to take care of dispatching
+ * the transaction to the backend. We are in the ready case, we cannot short-cut
+ * the checking of readyTx, as an in-flight transaction may have completed between us
+ * setting the field above and us checking.
+ */
+ if (inflightTx == null) {
+ synchronized (this) {
+ processIfReady();
}
}
}
@Override
- public void close() {
+ public synchronized void close() {
final PingPongTransaction notLocked = lockedTx;
Preconditions.checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked);
- synchronized (this) {
- processIfReady();
+ // This is not reliable, but if we observe it to be null and the process has already completed,
+ // the backend transaction chain will throw the appropriate error.
+ Preconditions.checkState(shutdownTx == null, "Attempted to close an already-closed chain");
+
+ // Force allocations on slow path, picking up a potentially-outstanding transaction
+ final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
+
+ if (tx != null) {
+ // We have one more transaction, which needs to be processed somewhere. If we do not
+ // a transaction in-flight, we need to push it down ourselves.
+ // If there is an in-flight transaction we will schedule this last one into a dedicated
+ // slot. Allocation slow path will check its presence and fail, the in-flight path will
+ // pick it up, submit and immediately close the chain.
+ if (inflightTx == null) {
+ processTransaction(tx);
+ delegate.close();
+ } else {
+ shutdownTx = tx;
+ }
+ } else {
+ // Nothing outstanding, we can safely shutdown
delegate.close();
}
}