X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FClientTransaction.java;h=334ab71d585d2f4295b8d78b2e5094fff839973e;hb=d6ed0a044d591d65847714451d97d80345154089;hp=8450c67224fb7625bffbf64d3d4741737d3049db;hpb=320a4e5cd2d9d80468a3f82798744f2035488218;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index 8450c67224..334ab71d58 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -12,20 +12,15 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.Collection; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Client-side view of a free-standing transaction. + * Client-side view of a transaction. * *

* This interface is used by the world outside of the actor system and in the actor system it is manifested via @@ -55,120 +50,61 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @Beta -public final class ClientTransaction extends LocalAbortable implements Identifiable { - private static final Logger LOG = LoggerFactory.getLogger(ClientTransaction.class); - private static final AtomicIntegerFieldUpdater STATE_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(ClientTransaction.class, "state"); - private static final int OPEN_STATE = 0; - private static final int CLOSED_STATE = 1; - - private final Map proxies = new ConcurrentHashMap<>(); - private final TransactionIdentifier transactionId; - private final AbstractClientHistory parent; - - private volatile int state = OPEN_STATE; +public final class ClientTransaction extends AbstractClientHandle { ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) { - this.transactionId = Preconditions.checkNotNull(transactionId); - this.parent = Preconditions.checkNotNull(parent); + super(parent, transactionId); } - private void checkNotClosed() { - Preconditions.checkState(state == OPEN_STATE, "Transaction %s is closed", transactionId); - } private AbstractProxyTransaction createProxy(final Long shard) { - return parent.createTransactionProxy(transactionId, shard); - } - - private AbstractProxyTransaction ensureProxy(final YangInstanceIdentifier path) { - checkNotClosed(); - - final Long shard = parent.resolveShardForPath(path); - return proxies.computeIfAbsent(shard, this::createProxy); + return parent().createTransactionProxy(getIdentifier(), shard); } - @Override - public TransactionIdentifier getIdentifier() { - return transactionId; + private AbstractProxyTransaction ensureTransactionProxy(final YangInstanceIdentifier path) { + return ensureProxy(path, this::createProxy); } public CheckedFuture exists(final YangInstanceIdentifier path) { - return ensureProxy(path).exists(path); + return ensureTransactionProxy(path).exists(path); } - public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - return ensureProxy(path).read(path); + public CheckedFuture>, ReadFailedException> read( + final YangInstanceIdentifier path) { + return ensureTransactionProxy(path).read(path); } public void delete(final YangInstanceIdentifier path) { - ensureProxy(path).delete(path); + ensureTransactionProxy(path).delete(path); } public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - ensureProxy(path).merge(path, data); + ensureTransactionProxy(path).merge(path, data); } public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - ensureProxy(path).write(path, data); - } - - private boolean ensureClosed() { - final int local = state; - if (local == CLOSED_STATE) { - return false; - } - - final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE); - Preconditions.checkState(success, "Transaction %s raced during close", this); - return true; + ensureTransactionProxy(path).write(path, data); } public DOMStoreThreePhaseCommitCohort ready() { - Preconditions.checkState(ensureClosed(), "Attempted to submit a closed transaction %s", this); - - for (AbstractProxyTransaction p : proxies.values()) { - p.seal(); - } + final Collection toReady = ensureClosed(); + Preconditions.checkState(toReady != null, "Attempted to submit a closed transaction %s", this); + toReady.forEach(AbstractProxyTransaction::seal); final AbstractTransactionCommitCohort cohort; - switch (proxies.size()) { + switch (toReady.size()) { case 0: - cohort = new EmptyTransactionCommitCohort(parent, transactionId); + cohort = new EmptyTransactionCommitCohort(parent(), getIdentifier()); break; case 1: - cohort = new DirectTransactionCommitCohort(parent, transactionId, - Iterables.getOnlyElement(proxies.values())); + cohort = new DirectTransactionCommitCohort(parent(), getIdentifier(), + Iterables.getOnlyElement(toReady)); break; default: - cohort = new ClientTransactionCommitCohort(parent, transactionId, proxies.values()); + cohort = new ClientTransactionCommitCohort(parent(), getIdentifier(), toReady); break; } - return parent.onTransactionReady(transactionId, cohort); - } - - /** - * Release all state associated with this transaction. - */ - public void abort() { - if (ensureClosed()) { - for (AbstractProxyTransaction proxy : proxies.values()) { - proxy.abort(); - } - proxies.clear(); - - parent.onTransactionAbort(transactionId); - } - } - - @Override - void localAbort(final Throwable cause) { - LOG.debug("Aborting transaction {}", getIdentifier(), cause); - abort(); - } - - Map getProxies() { - return proxies; + return parent().onTransactionReady(this, cohort); } }