/*
* This forces allocateTransaction() on a slow path, which has to happen after
- * this method has completed executing.
+ * this method has completed executing. Also inflightTx may be updated outside
+ * the lock, hence we need to re-check.
*/
@GuardedBy("this")
private void processIfReady() {
- final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
- if (tx != null) {
- processTransaction(tx);
+ if (inflightTx == null) {
+ final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
+ if (tx != null) {
+ processTransaction(tx);
+ }
}
}
* @param tx Transaction which needs processing.
*/
@GuardedBy("this")
- private void processTransaction(final @Nonnull PingPongTransaction tx) {
+ private void processTransaction(@Nonnull final PingPongTransaction tx) {
if (failed) {
LOG.debug("Cancelling transaction {}", tx);
tx.getTransaction().cancel();
tx.onFailure(t);
}
- private void readyTransaction(final @Nonnull PingPongTransaction tx) {
+ private void readyTransaction(@Nonnull final PingPongTransaction tx) {
// First mark the transaction as not locked.
final boolean lockedMatch = LOCKED_UPDATER.compareAndSet(this, tx, null);
Preconditions.checkState(lockedMatch, "Attempted to submit transaction %s while we have %s", tx, lockedTx);
}
@Override
- public void close() {
+ public synchronized void close() {
final PingPongTransaction notLocked = lockedTx;
Preconditions.checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked);
- synchronized (this) {
- processIfReady();
- delegate.close();
+ // Force allocations on slow path. We will complete the rest
+ final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
+
+ // Make sure no transaction is outstanding. Otherwise sleep a bit and retry
+ while (inflightTx != null) {
+ LOG.debug("Busy-waiting for in-flight transaction {} to complete", inflightTx);
+ Thread.yield();
+ continue;
}
+
+ // If we have an outstanding transaction, send it down
+ if (tx != null) {
+ processTransaction(tx);
+ }
+
+ // All done, close the delegate. All new allocations should fail.
+ delegate.close();
}
@Override