X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FPingPongTransactionChain.java;h=8cda1d53f9678baa5145789dd8f82978fa1796dd;hb=84d6864d26fddddd92da32fd00d57c7224d4213d;hp=6097b38bb223254fafb76509a903e604ac5b6209;hpb=7dd1463c557f712b6791d62d6103b573d517951b;p=controller.git diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java index 6097b38bb2..8cda1d53f9 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/PingPongTransactionChain.java @@ -11,16 +11,15 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FluentFuture; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.Map.Entry; import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; -import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -33,7 +32,8 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; import org.opendaylight.controller.md.sal.dom.spi.ForwardingDOMDataReadWriteTransaction; -import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.mdsal.common.api.CommitInfo; +import org.opendaylight.mdsal.common.api.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; @@ -48,9 +48,11 @@ import org.slf4j.LoggerFactory; * the committing transaction completes successfully, the scratch transaction * is enqueued as soon as it is ready. * + *

* This mode of operation means that there is no inherent isolation between * the front-end transactions and transactions cannot be reasonably cancelled. * + *

* It furthermore means that the transactions returned by {@link #newReadOnlyTransaction()} * counts as an outstanding transaction and the user may not allocate multiple * read-only transactions at the same time. @@ -71,8 +73,9 @@ public final class PingPongTransactionChain implements DOMTransactionChain { * This updater is used to manipulate the "ready" transaction. We perform only atomic * get-and-set on it. */ - private static final AtomicReferenceFieldUpdater READY_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "readyTx"); + private static final AtomicReferenceFieldUpdater READY_UPDATER + = AtomicReferenceFieldUpdater + .newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "readyTx"); private volatile PingPongTransaction readyTx; /** @@ -81,16 +84,18 @@ public final class PingPongTransactionChain implements DOMTransactionChain { * us. We perform on compare-and-swap to ensure we properly detect when a user is * attempting to allocated multiple transactions concurrently. */ - private static final AtomicReferenceFieldUpdater LOCKED_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "lockedTx"); + private static final AtomicReferenceFieldUpdater LOCKED_UPDATER + = AtomicReferenceFieldUpdater + .newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "lockedTx"); private volatile PingPongTransaction lockedTx; /** * This updater is used to manipulate the "inflight" transaction. There can be at most * one of these at any given time. We perform only compare-and-swap on these. */ - private static final AtomicReferenceFieldUpdater INFLIGHT_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "inflightTx"); + private static final AtomicReferenceFieldUpdater INFLIGHT_UPDATER + = AtomicReferenceFieldUpdater + .newUpdater(PingPongTransactionChain.class, PingPongTransaction.class, "inflightTx"); private volatile PingPongTransaction inflightTx; PingPongTransactionChain(final DOMDataBroker broker, final TransactionChainListener listener) { @@ -98,7 +103,7 @@ public final class PingPongTransactionChain implements DOMTransactionChain { this.delegate = broker.createTransactionChain(new TransactionChainListener() { @Override public void onTransactionChainFailed(final TransactionChain chain, - final AsyncTransaction transaction, final Throwable cause) { + final AsyncTransaction transaction, final Throwable cause) { LOG.debug("Transaction chain {} reported failure in {}", chain, transaction, cause); delegateFailed(chain, cause); } @@ -163,9 +168,9 @@ public final class PingPongTransactionChain implements DOMTransactionChain { Preconditions.checkState(shutdownTx == null, "Transaction chain %s has been shut down", this); if (deadTx != null) { - throw new IllegalStateException(String.format( - "Transaction chain %s has failed due to transaction %s being canceled", this, deadTx.getKey()), - deadTx.getValue()); + throw new IllegalStateException( + String.format("Transaction chain %s has failed due to transaction %s being canceled", this, + deadTx.getKey()), deadTx.getValue()); } final DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction(); @@ -173,7 +178,8 @@ public final class PingPongTransactionChain implements DOMTransactionChain { if (!LOCKED_UPDATER.compareAndSet(this, null, newTx)) { delegateTx.cancel(); - throw new IllegalStateException(String.format("New transaction %s raced with transacion %s", newTx, lockedTx)); + throw new IllegalStateException( + String.format("New transaction %s raced with transaction %s", newTx, lockedTx)); } return newTx; @@ -188,11 +194,12 @@ public final class PingPongTransactionChain implements DOMTransactionChain { return slowAllocateTransaction(); } - // Fast path: reuse current transaction. We will check failures and similar on submit(). + // Fast path: reuse current transaction. We will check failures and similar on commit(). if (!LOCKED_UPDATER.compareAndSet(this, null, oldTx)) { // Ouch. Delegate chain has not detected a duplicate transaction allocation. This is the best we can do. oldTx.getTransaction().cancel(); - throw new IllegalStateException(String.format("Reusable transaction %s raced with transaction %s", oldTx, lockedTx)); + throw new IllegalStateException( + String.format("Reusable transaction %s raced with transaction %s", oldTx, lockedTx)); } return oldTx; @@ -232,17 +239,17 @@ public final class PingPongTransactionChain implements DOMTransactionChain { LOG.warn("Submitting transaction {} while {} is still running", tx, inflightTx); } - Futures.addCallback(tx.getTransaction().submit(), new FutureCallback() { + tx.getTransaction().commit().addCallback(new FutureCallback() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final CommitInfo result) { transactionSuccessful(tx, result); } @Override - public void onFailure(final Throwable t) { - transactionFailed(tx, t); + public void onFailure(final Throwable throwable) { + transactionFailed(tx, throwable); } - }); + }, MoreExecutors.directExecutor()); } /* @@ -280,17 +287,17 @@ public final class PingPongTransactionChain implements DOMTransactionChain { } } - void transactionSuccessful(final PingPongTransaction tx, final Void result) { + void transactionSuccessful(final PingPongTransaction tx, final CommitInfo result) { LOG.debug("Transaction {} completed successfully", tx); tx.onSuccess(result); processNextTransaction(tx); } - void transactionFailed(final PingPongTransaction tx, final Throwable t) { - LOG.debug("Transaction {} failed", tx, t); + void transactionFailed(final PingPongTransaction tx, final Throwable throwable) { + LOG.debug("Transaction {} failed", tx, throwable); - tx.onFailure(t); + tx.onFailure(throwable); processNextTransaction(tx); } @@ -326,15 +333,14 @@ public final class PingPongTransactionChain implements DOMTransactionChain { * and return false for everything else. Cancelling such a transaction will result in all transactions in the * batch to be cancelled. * - * @param tx Backend shared transaction - * @param frontendTx - * @param isOpen indicator whether the transaction was already closed - * @return True if cancellation succeeded, false otherwise + * @param tx Backend shared transaction + * @param frontendTx transaction + * @param isOpen indicator whether the transaction was already closed */ synchronized void cancelTransaction(final PingPongTransaction tx, final DOMDataReadWriteTransaction frontendTx) { // Attempt to unlock the operation. final boolean lockedMatch = LOCKED_UPDATER.compareAndSet(this, tx, null); - Verify.verify(lockedMatch, "Cancelling transaction {} collided with locked transaction {}", tx, lockedTx); + Verify.verify(lockedMatch, "Cancelling transaction %s collided with locked transaction %s", tx, lockedTx); // Cancel the backend transaction, so we do not end up leaking it. final boolean backendCancelled = tx.getTransaction().cancel(); @@ -362,15 +368,16 @@ public final class PingPongTransactionChain implements DOMTransactionChain { // transaction chain, too. Since we just came off of a locked transaction, we do not have a ready transaction // at the moment, but there may be some transaction in-flight. So we proceed to shutdown the backend chain // and mark the fact that we should be turning its completion into a failure. - deadTx = new SimpleImmutableEntry<>(tx, - new CancellationException("Transaction " + frontendTx + " canceled").fillInStackTrace()); + deadTx = new SimpleImmutableEntry<>(tx, new CancellationException("Transaction " + frontendTx + " canceled") + .fillInStackTrace()); delegate.close(); } @Override public synchronized void close() { final PingPongTransaction notLocked = lockedTx; - Preconditions.checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked); + Preconditions + .checkState(notLocked == null, "Attempted to close chain with outstanding transaction %s", notLocked); // This is not reliable, but if we observe it to be null and the process has already completed, // the backend transaction chain will throw the appropriate error. @@ -409,14 +416,14 @@ public final class PingPongTransactionChain implements DOMTransactionChain { return new DOMDataReadOnlyTransaction() { @Override - public CheckedFuture>, ReadFailedException> read(final LogicalDatastoreType store, - final YangInstanceIdentifier path) { + public CheckedFuture>, ReadFailedException> read( + final LogicalDatastoreType store, final YangInstanceIdentifier path) { return tx.getTransaction().read(store, path); } @Override public CheckedFuture exists(final LogicalDatastoreType store, - final YangInstanceIdentifier path) { + final YangInstanceIdentifier path) { return tx.getTransaction().exists(store, path); } @@ -445,17 +452,16 @@ public final class PingPongTransactionChain implements DOMTransactionChain { @Override public CheckedFuture submit() { - readyTransaction(tx); - isOpen = false; - return tx.getSubmitFuture(); + return MappingCheckedFuture.create(commit().transform(ignored -> null, + MoreExecutors.directExecutor()), TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER); } - @Deprecated @Override - public ListenableFuture> commit() { + public FluentFuture commit() { readyTransaction(tx); isOpen = false; - return tx.getCommitFuture(); + return FluentFuture.from(tx.getSubmitFuture()).transformAsync( + ignored -> CommitInfo.emptyFluentFuture(), MoreExecutors.directExecutor()); } @Override