Most of our accesses are compareAndSwap() so we do not benefit all
that much from ARFU type safety. Switch to using VarHandles to access
our atomic fields. Provide an acquireReadyTx() to deal with the few
call sites which would require explicit cast due to using getAndSet().
VarHandles allow us to use compareAndExchange(), which exposes the
witness value -- hence our error paths report the correct object without
a possibility of a race.
Change-Id: I79d2791f6e0d6accb987e46d31415452f735a7f8
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
4a94fedbe24344b6dbe67287a560daba8b99eb84)
package org.opendaylight.mdsal.dom.spi;
import static com.google.common.base.Preconditions.checkState;
package org.opendaylight.mdsal.dom.spi;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNull;
-import com.google.common.base.VerifyException;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
import java.util.function.Function;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
@GuardedBy("this")
private Entry<PingPongTransaction, Throwable> deadTx;
@GuardedBy("this")
private Entry<PingPongTransaction, Throwable> deadTx;
- // This updater is used to manipulate the "ready" transaction. We perform only atomic get-and-set on it.
- private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> READY_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class,
- "readyTx");
+ // This VarHandle is used to manipulate the "ready" transaction. We perform only atomic get-and-set on it.
+ private static final VarHandle READY_TX;
+ @SuppressWarnings("unused")
private volatile PingPongTransaction readyTx;
/*
private volatile PingPongTransaction readyTx;
/*
- * This updater is used to manipulate the "locked" transaction. A locked transaction means we know that the user
+ * This VarHandle is used to manipulate the "locked" transaction. A locked transaction means we know that the user
* still holds a transaction and should at some point call us. We perform on compare-and-swap to ensure we properly
* detect when a user is attempting to allocated multiple transactions concurrently.
*/
* still holds a transaction and should at some point call us. We perform on compare-and-swap to ensure we properly
* detect when a user is attempting to allocated multiple transactions concurrently.
*/
- private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> LOCKED_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class,
- "lockedTx");
+ private static final VarHandle LOCKED_TX;
private volatile PingPongTransaction lockedTx;
/*
* This updater is used to manipulate the "inflight" transaction. There can be at most one of these at any given
* time. We perform only compare-and-swap on these.
*/
private volatile PingPongTransaction lockedTx;
/*
* This updater is used to manipulate the "inflight" transaction. There can be at most one of these at any given
* time. We perform only compare-and-swap on these.
*/
- private static final AtomicReferenceFieldUpdater<PingPongTransactionChain, PingPongTransaction> INFLIGHT_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class,
- "inflightTx");
+ private static final VarHandle INFLIGHT_TX;
private volatile PingPongTransaction inflightTx;
private volatile PingPongTransaction inflightTx;
+ static {
+ final var lookup = MethodHandles.lookup();
+ try {
+ INFLIGHT_TX = lookup.findVarHandle(PingPongTransactionChain.class, "inflightTx", PingPongTransaction.class);
+ LOCKED_TX = lookup.findVarHandle(PingPongTransactionChain.class, "lockedTx", PingPongTransaction.class);
+ READY_TX = lookup.findVarHandle(PingPongTransactionChain.class, "readyTx", PingPongTransaction.class);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
public PingPongTransactionChain(final Function<DOMTransactionChainListener, DOMTransactionChain> delegateFactory,
final DOMTransactionChainListener listener) {
this.listener = requireNonNull(listener);
public PingPongTransactionChain(final Function<DOMTransactionChainListener, DOMTransactionChain> delegateFactory,
final DOMTransactionChainListener listener) {
this.listener = requireNonNull(listener);
final DOMDataTreeReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
final PingPongTransaction newTx = new PingPongTransaction(delegateTx);
final DOMDataTreeReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
final PingPongTransaction newTx = new PingPongTransaction(delegateTx);
- if (!LOCKED_UPDATER.compareAndSet(this, null, newTx)) {
+ final Object witness = LOCKED_TX.compareAndExchange(this, null, newTx);
+ if (witness != null) {
delegateTx.cancel();
throw new IllegalStateException(
delegateTx.cancel();
throw new IllegalStateException(
- String.format("New transaction %s raced with transaction %s", newTx, lockedTx));
+ String.format("New transaction %s raced with transaction %s", newTx, witness));
+ private @Nullable PingPongTransaction acquireReadyTx() {
+ return (PingPongTransaction) READY_TX.getAndSet(this, null);
+ }
+
private @NonNull PingPongTransaction allocateTransaction() {
// Step 1: acquire current state
private @NonNull PingPongTransaction allocateTransaction() {
// Step 1: acquire current state
- final PingPongTransaction oldTx = READY_UPDATER.getAndSet(this, null);
+ final PingPongTransaction oldTx = acquireReadyTx();
// Slow path: allocate a delegate transaction
if (oldTx == null) {
// Slow path: allocate a delegate transaction
if (oldTx == null) {
}
// Fast path: reuse current transaction. We will check failures and similar on commit().
}
// Fast path: reuse current transaction. We will check failures and similar on commit().
- if (!LOCKED_UPDATER.compareAndSet(this, null, oldTx)) {
+ final Object witness = LOCKED_TX.compareAndExchange(this, null, oldTx);
+ if (witness != null) {
// Ouch. Delegate chain has not detected a duplicate transaction allocation. This is the best we can do.
oldTx.getTransaction().cancel();
throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx,
// Ouch. Delegate chain has not detected a duplicate transaction allocation. This is the best we can do.
oldTx.getTransaction().cancel();
throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx,
@Holding("this")
private void processIfReady() {
if (inflightTx == null) {
@Holding("this")
private void processIfReady() {
if (inflightTx == null) {
- final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
+ final PingPongTransaction tx = acquireReadyTx();
if (tx != null) {
processTransaction(tx);
}
if (tx != null) {
processTransaction(tx);
}
}
LOG.debug("Submitting transaction {}", tx);
}
LOG.debug("Submitting transaction {}", tx);
- if (!INFLIGHT_UPDATER.compareAndSet(this, null, tx)) {
- LOG.warn("Submitting transaction {} while {} is still running", tx, inflightTx);
+ final Object witness = INFLIGHT_TX.compareAndExchange(this, null, tx);
+ if (witness != null) {
+ LOG.warn("Submitting transaction {} while {} is still running", tx, witness);
}
tx.getTransaction().commit().addCallback(new FutureCallback<CommitInfo>() {
}
tx.getTransaction().commit().addCallback(new FutureCallback<CommitInfo>() {
* correctness.
*/
private synchronized void processNextTransaction(final PingPongTransaction tx) {
* correctness.
*/
private synchronized void processNextTransaction(final PingPongTransaction tx) {
- if (!INFLIGHT_UPDATER.compareAndSet(this, tx, null)) {
- throw new IllegalStateException(String.format("Completed transaction %s while %s was submitted", tx,
- inflightTx));
- }
+ final Object witness = INFLIGHT_TX.compareAndExchange(this, tx, null);
+ checkState(witness == tx, "Completed transaction %s while %s was submitted", tx, witness);
- final PingPongTransaction nextTx = READY_UPDATER.getAndSet(this, null);
+ final PingPongTransaction nextTx = acquireReadyTx();
if (nextTx == null) {
final PingPongTransaction local = shutdownTx;
if (local != null) {
if (nextTx == null) {
final PingPongTransaction local = shutdownTx;
if (local != null) {
void readyTransaction(final @NonNull PingPongTransaction tx) {
// First mark the transaction as not locked.
void readyTransaction(final @NonNull PingPongTransaction tx) {
// First mark the transaction as not locked.
- if (!LOCKED_UPDATER.compareAndSet(this, tx, null)) {
- throw new IllegalStateException(String.format("Attempted to submit transaction %s while we have %s", tx,
- lockedTx));
- }
+ final Object lockedWitness = LOCKED_TX.compareAndExchange(this, tx, null);
+ checkState(lockedWitness == tx, "Attempted to submit transaction %s while we have %s", tx, lockedWitness);
LOG.debug("Transaction {} unlocked", tx);
/*
* The transaction is ready. It will then be picked up by either next allocation,
* or a background transaction completion callback.
*/
LOG.debug("Transaction {} unlocked", tx);
/*
* The transaction is ready. It will then be picked up by either next allocation,
* or a background transaction completion callback.
*/
- if (!READY_UPDATER.compareAndSet(this, null, tx)) {
- throw new IllegalStateException(String.format("Transaction %s collided on ready state with %s", tx,
- readyTx));
- }
+ final Object readyWitness = READY_TX.compareAndExchange(this, null, tx);
+ checkState(readyWitness == null, "Transaction %s collided on ready state with %s", tx, readyWitness);
LOG.debug("Transaction {} readied", tx);
/*
LOG.debug("Transaction {} readied", tx);
/*
synchronized void cancelTransaction(final PingPongTransaction tx,
final DOMDataTreeReadWriteTransaction frontendTx) {
// Attempt to unlock the operation.
synchronized void cancelTransaction(final PingPongTransaction tx,
final DOMDataTreeReadWriteTransaction frontendTx) {
// Attempt to unlock the operation.
- if (!LOCKED_UPDATER.compareAndSet(this, tx, null)) {
- throw new VerifyException(String.format("Cancelling transaction %s collided with locked transaction %s", tx,
- lockedTx));
- }
+ final Object witness = LOCKED_TX.compareAndExchange(this, tx, null);
+ verify(witness == tx, "Cancelling transaction %s collided with locked transaction %s", tx, witness);
// Cancel the backend transaction, so we do not end up leaking it.
final boolean backendCancelled = tx.getTransaction().cancel();
// Cancel the backend transaction, so we do not end up leaking it.
final boolean backendCancelled = tx.getTransaction().cancel();
}
// Force allocations on slow path, picking up a potentially-outstanding transaction
}
// Force allocations on slow path, picking up a potentially-outstanding transaction
- final PingPongTransaction tx = READY_UPDATER.getAndSet(this, null);
+ final PingPongTransaction tx = acquireReadyTx();
if (tx != null) {
// We have one more transaction, which needs to be processed somewhere. If we do not
if (tx != null) {
// We have one more transaction, which needs to be processed somewhere. If we do not