X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FClientTransaction.java;h=f5855c279a9e4e9f648b93a047c1d9cdc3c4a1cc;hb=27e0cf91aaba3d2a9f9e3f33e31f0faece502cd4;hp=abb134526954f90a63c7f394ae6a91bba887c5cb;hpb=db9a673c114febc785fbd324947ac2c3e3095d06;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index abb1345269..f5855c279a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -7,25 +7,19 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.annotations.Beta; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.CheckedFuture; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import com.google.common.util.concurrent.FluentFuture; +import java.util.Collection; +import java.util.Optional; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Client-side view of a free-standing transaction. + * Client-side view of a transaction. * *

* This interface is used by the world outside of the actor system and in the actor system it is manifested via @@ -55,127 +49,58 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @Beta -public final class ClientTransaction extends LocalAbortable implements Identifiable { - private static final Logger LOG = LoggerFactory.getLogger(ClientTransaction.class); - private static final AtomicIntegerFieldUpdater STATE_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(ClientTransaction.class, "state"); - private static final int OPEN_STATE = 0; - private static final int CLOSED_STATE = 1; - - private final Map proxies = new ConcurrentHashMap<>(); - private final TransactionIdentifier transactionId; - private final AbstractClientHistory parent; - - private volatile int state = OPEN_STATE; - +public class ClientTransaction extends AbstractClientHandle { ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) { - this.transactionId = Preconditions.checkNotNull(transactionId); - this.parent = Preconditions.checkNotNull(parent); - } - - private void checkNotClosed() { - Preconditions.checkState(state == OPEN_STATE, "Transaction %s is closed", transactionId); - } - - private AbstractProxyTransaction createProxy(final Long shard) { - return parent.createTransactionProxy(transactionId, shard); + super(parent, transactionId); } - private AbstractProxyTransaction ensureProxy(final YangInstanceIdentifier path) { - checkNotClosed(); - - final Long shard = parent.resolveShardForPath(path); - return proxies.computeIfAbsent(shard, this::createProxy); + private AbstractProxyTransaction ensureTransactionProxy(final YangInstanceIdentifier path) { + return ensureProxy(path); } - @Override - public TransactionIdentifier getIdentifier() { - return transactionId; + public FluentFuture exists(final YangInstanceIdentifier path) { + return ensureTransactionProxy(path).exists(path); } - public CheckedFuture exists(final YangInstanceIdentifier path) { - return ensureProxy(path).exists(path); - } - - public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - return ensureProxy(path).read(path); + public FluentFuture> read(final YangInstanceIdentifier path) { + return ensureTransactionProxy(path).read(path); } public void delete(final YangInstanceIdentifier path) { - ensureProxy(path).delete(path); - } - - public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - ensureProxy(path).merge(path, data); + ensureTransactionProxy(path).delete(path); } - public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - ensureProxy(path).write(path, data); + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + ensureTransactionProxy(path).merge(path, data); } - private boolean ensureClosed() { - final int local = state; - if (local == CLOSED_STATE) { - return false; - } - - final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE); - Preconditions.checkState(success, "Transaction %s raced during close", this); - return true; + public void write(final YangInstanceIdentifier path, final NormalizedNode data) { + ensureTransactionProxy(path).write(path, data); } public DOMStoreThreePhaseCommitCohort ready() { - Preconditions.checkState(ensureClosed(), "Attempted to submit a closed transaction %s", this); - - for (AbstractProxyTransaction p : proxies.values()) { - p.seal(); - } + final Collection toReady = ensureClosed(); + checkState(toReady != null, "Attempted to submit a closed transaction %s", this); + toReady.forEach(AbstractProxyTransaction::seal); final AbstractTransactionCommitCohort cohort; - switch (proxies.size()) { + switch (toReady.size()) { case 0: - cohort = new EmptyTransactionCommitCohort(parent, transactionId); + cohort = new EmptyTransactionCommitCohort(parent(), getIdentifier()); break; case 1: - cohort = new DirectTransactionCommitCohort(parent, transactionId, - Iterables.getOnlyElement(proxies.values())); + cohort = new DirectTransactionCommitCohort(parent(), getIdentifier(), toReady.iterator().next()); break; default: - cohort = new ClientTransactionCommitCohort(parent, transactionId, proxies.values()); + cohort = new ClientTransactionCommitCohort(parent(), getIdentifier(), toReady); break; } - return parent.onTransactionReady(transactionId, cohort); - } - - /** - * Release all state associated with this transaction. - */ - public void abort() { - if (commonAbort()) { - parent.onTransactionAbort(transactionId); - } - } - - private boolean commonAbort() { - if (!ensureClosed()) { - return false; - } - - for (AbstractProxyTransaction proxy : proxies.values()) { - proxy.abort(); - } - proxies.clear(); - return true; + return parent().onTransactionReady(this, cohort); } @Override - void localAbort(final Throwable cause) { - LOG.debug("Local abort of transaction {}", getIdentifier(), cause); - commonAbort(); - } - - Map getProxies() { - return proxies; + final AbstractProxyTransaction createProxy(final Long shard) { + return parent().createTransactionProxy(getIdentifier(), shard); } }