X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FClientTransaction.java;h=4fbb49a61d3f23888c99e2d4f258a8601c8dc091;hp=81d00ee8bce3279842696f6a912d4744423a2e47;hb=99f80f27bee37bb23e345420bf14bb7bb4793c28;hpb=cc1ec4a8e2ec99ad7711d0e5e649b34d37d87da0 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index 81d00ee8bc..4fbb49a61d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -7,25 +7,24 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.annotations.Beta; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.CheckedFuture; -import java.util.HashMap; +import com.google.common.util.concurrent.FluentFuture; +import java.util.Collection; import java.util.Map; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.Optional; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.controller.cluster.datastore.utils.RootScatterGather; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Client-side view of a free-standing transaction. + * Client-side view of a transaction. * *

* This interface is used by the world outside of the actor system and in the actor system it is manifested via @@ -55,121 +54,93 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @Beta -public final class ClientTransaction extends LocalAbortable implements Identifiable { - private static final Logger LOG = LoggerFactory.getLogger(ClientTransaction.class); - private static final AtomicIntegerFieldUpdater STATE_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(ClientTransaction.class, "state"); - private static final int OPEN_STATE = 0; - private static final int CLOSED_STATE = 1; - - private final Map proxies = new HashMap<>(); - private final TransactionIdentifier transactionId; - private final AbstractClientHistory parent; - - private volatile int state = OPEN_STATE; - +public class ClientTransaction extends AbstractClientHandle { ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) { - this.transactionId = Preconditions.checkNotNull(transactionId); - this.parent = Preconditions.checkNotNull(parent); + super(parent, transactionId); } - private void checkNotClosed() { - Preconditions.checkState(state == OPEN_STATE, "Transaction %s is closed", transactionId); - } - - private AbstractProxyTransaction createProxy(final Long shard) { - return parent.createTransactionProxy(transactionId, shard); + public FluentFuture exists(final YangInstanceIdentifier path) { + return ensureProxy(path).exists(path); } - private AbstractProxyTransaction ensureProxy(final YangInstanceIdentifier path) { - checkNotClosed(); - - final ModuleShardBackendResolver resolver = parent.getClient().resolver(); - final Long shard = resolver.resolveShardForPath(path); - return proxies.computeIfAbsent(shard, this::createProxy); + public FluentFuture> read(final YangInstanceIdentifier path) { + return path.isEmpty() ? readRoot() : ensureProxy(path).read(path); } - @Override - public TransactionIdentifier getIdentifier() { - return transactionId; + private FluentFuture> readRoot() { + return RootScatterGather.gather(parent().actorUtils(), ensureAllProxies() + .map(proxy -> proxy.read(YangInstanceIdentifier.empty()))); } - public CheckedFuture exists(final YangInstanceIdentifier path) { - return ensureProxy(path).exists(path); + public void delete(final YangInstanceIdentifier path) { + if (path.isEmpty()) { + ensureAllProxies().forEach(proxy -> proxy.delete(YangInstanceIdentifier.empty())); + } else { + ensureProxy(path).delete(path); + } } - public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - return ensureProxy(path).read(path); + public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { + if (path.isEmpty()) { + mergeRoot(RootScatterGather.castRootNode(data)); + } else { + ensureProxy(path).merge(path, data); + } } - public void delete(final YangInstanceIdentifier path) { - ensureProxy(path).delete(path); + private void mergeRoot(final @NonNull ContainerNode rootData) { + if (!rootData.isEmpty()) { + RootScatterGather.scatterTouched(rootData, this::ensureProxy).forEach( + scattered -> scattered.shard().merge(YangInstanceIdentifier.empty(), scattered.container())); + } } - public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - ensureProxy(path).merge(path, data); + public void write(final YangInstanceIdentifier path, final NormalizedNode data) { + if (path.isEmpty()) { + writeRoot(RootScatterGather.castRootNode(data)); + } else { + ensureProxy(path).write(path, data); + } } - public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - ensureProxy(path).write(path, data); + private void writeRoot(final @NonNull ContainerNode rootData) { + RootScatterGather.scatterAll(rootData, this::ensureProxy, ensureAllProxies()).forEach( + scattered -> scattered.shard().write(YangInstanceIdentifier.empty(), scattered.container())); } - private boolean ensureClosed() { - final int local = state; - if (local != CLOSED_STATE) { - final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE); - Preconditions.checkState(success, "Transaction %s raced during close", this); - return true; - } else { - return false; - } + private AbstractProxyTransaction ensureProxy(final PathArgument childId) { + return ensureProxy(YangInstanceIdentifier.create(childId)); } public DOMStoreThreePhaseCommitCohort ready() { - Preconditions.checkState(ensureClosed(), "Attempted to submit a closed transaction %s", this); + final Map participants = ensureClosed(); + checkState(participants != null, "Attempted to submit a closed transaction %s", this); - for (AbstractProxyTransaction p : proxies.values()) { - p.seal(); - } + final Collection toReady = participants.values(); + toReady.forEach(AbstractProxyTransaction::seal); + + final TransactionIdentifier txId = getIdentifier(); + final AbstractClientHistory parent = parent(); + parent.onTransactionShardsBound(txId, participants.keySet()); final AbstractTransactionCommitCohort cohort; - switch (proxies.size()) { + switch (toReady.size()) { case 0: - cohort = new EmptyTransactionCommitCohort(parent, transactionId); + cohort = new EmptyTransactionCommitCohort(parent, txId); break; case 1: - cohort = new DirectTransactionCommitCohort(parent, transactionId, - Iterables.getOnlyElement(proxies.values())); + cohort = new DirectTransactionCommitCohort(parent, txId, toReady.iterator().next()); break; default: - cohort = new ClientTransactionCommitCohort(parent, transactionId, proxies.values()); + cohort = new ClientTransactionCommitCohort(parent, txId, toReady); break; } - return parent.onTransactionReady(transactionId, cohort); - } - - /** - * Release all state associated with this transaction. - */ - public void abort() { - if (ensureClosed()) { - for (AbstractProxyTransaction proxy : proxies.values()) { - proxy.abort(); - } - proxies.clear(); - - parent.onTransactionAbort(transactionId); - } + return parent.onTransactionReady(this, cohort); } @Override - void localAbort(final Throwable cause) { - LOG.debug("Aborting transaction {}", getIdentifier(), cause); - abort(); - } - - Map getProxies() { - return proxies; + final AbstractProxyTransaction createProxy(final Long shard) { + return parent().createTransactionProxy(getIdentifier(), shard); } }