X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fsharding%2FShardProxyTransaction.java;h=dcb74fa49e6749d990131d497e16dc49efe3a160;hb=3dc48592696e6a4535c0e125c1e23dbc62bc9091;hp=b3c8dfca33f6c6553872bd243aa19673f598eebf;hpb=877c428f2897f6e3b11efd25589a84aa0c660a31;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java index b3c8dfca33..dcb74fa49e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardProxyTransaction.java @@ -9,17 +9,27 @@ package org.opendaylight.controller.cluster.sharding; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.stream.Collectors; import javax.annotation.Nonnull; -import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory; import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction; -import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction; +import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.store.inmemory.ForeignShardThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,32 +40,43 @@ import org.slf4j.LoggerFactory; class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction { private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class); - private static final ListenableFuture NULL_FUTURE = Futures.immediateFuture(null); - private static final ListenableFuture VALIDATE_FUTURE = Futures.immediateFuture(true); private final DOMDataTreeIdentifier shardRoot; private final Collection prefixes; - private final DataStoreClient client; - private final ClientLocalHistory history; + private final DistributedShardModification modification; private ClientTransaction currentTx; - private DOMStoreThreePhaseCommitCohort cohort; + private final List cohorts = new ArrayList<>(); + private DOMDataTreeWriteCursor cursor = null; - ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot, final Collection prefixes, - final DataStoreClient client) { + ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot, + final Collection prefixes, + final DistributedShardModification modification) { this.shardRoot = Preconditions.checkNotNull(shardRoot); this.prefixes = Preconditions.checkNotNull(prefixes); - this.client = Preconditions.checkNotNull(client); - history = client.createLocalHistory(); - currentTx = history.createTransaction(); + this.modification = Preconditions.checkNotNull(modification); + } + + private DOMDataTreeWriteCursor getCursor() { + if (cursor == null) { + cursor = new DistributedShardModificationCursor(modification, this); + } + return cursor; } @Nonnull @Override public DOMDataTreeWriteCursor createCursor(@Nonnull final DOMDataTreeIdentifier prefix) { checkAvailable(prefix); + final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier()); + final DOMDataTreeWriteCursor ret = getCursor(); + ret.enter(relativePath.getPathArguments()); + return ret; + } - return currentTx.openCursor(); + void cursorClosed() { + cursor = null; + modification.cursorClosed(); } private void checkAvailable(final DOMDataTreeIdentifier prefix) { @@ -68,21 +89,31 @@ class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction { + "Available prefixes: " + prefixes); } + private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) { + final Optional relative = + path.relativeTo(modification.getPrefix().getRootIdentifier()); + Preconditions.checkArgument(relative.isPresent()); + return relative.get(); + } + @Override public void ready() { LOG.debug("Readying transaction for shard {}", shardRoot); - Preconditions.checkState(cohort == null, "Transaction was readied already"); - cohort = currentTx.ready(); - currentTx = null; + Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction."); + + cohorts.add(modification.seal()); + for (Entry entry + : modification.getChildShards().entrySet()) { + cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue())); + } } @Override public void close() { - if (cohort != null) { - cohort.abort(); - cohort = null; - } + cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort); + cohorts.clear(); + if (currentTx != null) { currentTx.abort(); currentTx = null; @@ -93,31 +124,91 @@ class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction { public ListenableFuture submit() { LOG.debug("Submitting transaction for shard {}", shardRoot); - Preconditions.checkNotNull(cohort, "Transaction not readied yet"); - return NULL_FUTURE; + checkTransactionReadied(); + + final AsyncFunction validateFunction = input -> prepare(); + final AsyncFunction prepareFunction = input -> commit(); + + // transform validate into prepare + final ListenableFuture prepareFuture = Futures.transformAsync(validate(), validateFunction, + MoreExecutors.directExecutor()); + // transform prepare into commit and return as submit result + return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor()); + } + + private void checkTransactionReadied() { + Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet"); } @Override public ListenableFuture validate() { LOG.debug("Validating transaction for shard {}", shardRoot); - Preconditions.checkNotNull(cohort, "Transaction not readied yet"); - return VALIDATE_FUTURE; + checkTransactionReadied(); + final List> futures = + cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList()); + final SettableFuture ret = SettableFuture.create(); + + Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { + @Override + public void onSuccess(final List result) { + ret.set(true); + } + + @Override + public void onFailure(final Throwable throwable) { + ret.setException(throwable); + } + }, MoreExecutors.directExecutor()); + + return ret; } @Override public ListenableFuture prepare() { LOG.debug("Preparing transaction for shard {}", shardRoot); - Preconditions.checkNotNull(cohort, "Transaction not readied yet"); - return NULL_FUTURE; + checkTransactionReadied(); + final List> futures = + cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList()); + final SettableFuture ret = SettableFuture.create(); + + Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { + @Override + public void onSuccess(final List result) { + ret.set(null); + } + + @Override + public void onFailure(final Throwable throwable) { + ret.setException(throwable); + } + }, MoreExecutors.directExecutor()); + + return ret; } @Override public ListenableFuture commit() { LOG.debug("Committing transaction for shard {}", shardRoot); - Preconditions.checkNotNull(cohort, "Transaction not readied yet"); - return NULL_FUTURE; + checkTransactionReadied(); + final List> futures = + cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList()); + final SettableFuture ret = SettableFuture.create(); + + Futures.addCallback(Futures.allAsList(futures), new FutureCallback>() { + @Override + public void onSuccess(final List result) { + ret.set(null); + } + + @Override + public void onFailure(final Throwable throwable) { + ret.setException(throwable); + } + }, MoreExecutors.directExecutor()); + + return ret; } }