X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmd%2Fsal%2Fdom%2Fbroker%2Fimpl%2FDOMDataBrokerTransactionChainImpl.java;h=77387c761cd6b26efea4c05471eb9632c695c578;hp=7cd6afa466e7d57b57f6861ac12aabf30bf90347;hb=32715b020aca7945bc22476138f20ef7c78a3620;hpb=8148d0748c1578d928fde08f2875b65f9b09e04a diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java index 7cd6afa466..77387c761c 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataBrokerTransactionChainImpl.java @@ -6,11 +6,14 @@ */ package org.opendaylight.controller.md.sal.dom.broker.impl; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import java.util.Map; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; @@ -27,16 +30,27 @@ import org.slf4j.LoggerFactory; * {@link org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType} type. * */ -public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory - implements DOMTransactionChain, DOMDataCommitErrorListener { +final class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTransactionFactory + implements DOMTransactionChain { + private static enum State { + RUNNING, + CLOSING, + CLOSED, + FAILED, + } + private static final AtomicIntegerFieldUpdater COUNTER_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, "counter"); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(DOMDataBrokerTransactionChainImpl.class, State.class, "state"); private static final Logger LOG = LoggerFactory.getLogger(DOMDataBrokerTransactionChainImpl.class); private final AtomicLong txNum = new AtomicLong(); - private final DOMDataCommitExecutor coordinator; + private final AbstractDOMDataBroker broker; private final TransactionChainListener listener; private final long chainId; - private volatile boolean failed = false; + private volatile State state = State.RUNNING; + private volatile int counter = 0; /** * @@ -55,44 +69,77 @@ public class DOMDataBrokerTransactionChainImpl extends AbstractDOMForwardedTrans */ public DOMDataBrokerTransactionChainImpl(final long chainId, final Map chains, - final DOMDataCommitExecutor coordinator, final TransactionChainListener listener) { + final AbstractDOMDataBroker broker, final TransactionChainListener listener) { super(chains); this.chainId = chainId; - this.coordinator = Preconditions.checkNotNull(coordinator); + this.broker = Preconditions.checkNotNull(broker); this.listener = Preconditions.checkNotNull(listener); } + private void checkNotFailed() { + Preconditions.checkState(state != State.FAILED, "Transaction chain has failed"); + } + @Override protected Object newTransactionIdentifier() { return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement(); } @Override - public CheckedFuture submit( + public CheckedFuture submit( final DOMDataWriteTransaction transaction, final Iterable cohorts) { + checkNotFailed(); checkNotClosed(); - return coordinator.submit(transaction, cohorts, Optional. of(this)); + final CheckedFuture ret = broker.submit(transaction, cohorts); + + COUNTER_UPDATER.incrementAndGet(this); + Futures.addCallback(ret, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + transactionCompleted(); + } + + @Override + public void onFailure(final Throwable t) { + transactionFailed(transaction, t); + } + }); + + return ret; } @Override public void close() { - super.close(); + final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING); + if (!success) { + LOG.debug("Chain {} is no longer running", this); + return; + } + super.close(); for (DOMStoreTransactionChain subChain : getTxFactories().values()) { subChain.close(); } - if (!failed) { - LOG.debug("Transaction chain {} successfully finished.", this); - // FIXME: this event should be emitted once all operations complete - listener.onTransactionChainSuccessful(this); + if (counter == 0) { + finishClose(); } } - @Override - public void onCommitFailed(final DOMDataWriteTransaction tx, final Throwable cause) { - failed = true; + private void finishClose() { + state = State.CLOSED; + listener.onTransactionChainSuccessful(this); + } + + private void transactionCompleted() { + if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) { + finishClose(); + } + } + + private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) { + state = State.FAILED; LOG.debug("Transaction chain {} failed.", this, cause); listener.onTransactionChainFailed(this, tx, cause); }