X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=dom%2Fmdsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Fdom%2Fbroker%2FShardedDOMDataTreeWriteTransaction.java;h=83080f7be141b5129b84a32bb0cc62bd595e034b;hb=6ffa8194f3ae4630f958bf4ab36c79709b951799;hp=fb5c1315fecfa15cebe80ad357fbe052dd7e639d;hpb=c37d38386002ed12b279938051813f99a4de70ff;p=mdsal.git diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java index fb5c1315fe..83080f7be1 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java @@ -8,21 +8,23 @@ package org.opendaylight.mdsal.dom.broker; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import java.util.Collection; import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; @@ -30,7 +32,6 @@ import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; -import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor; import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardProducer; import org.opendaylight.mdsal.dom.store.inmemory.DOMDataTreeShardWriteTransaction; @@ -43,11 +44,19 @@ import org.slf4j.LoggerFactory; @NotThreadSafe final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAwareTransaction { private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTreeWriteTransaction.class); + private static final TransactionCommitFailedExceptionMapper SUBMIT_FAILED_MAPPER = + TransactionCommitFailedExceptionMapper.create("submit"); private static final AtomicLong COUNTER = new AtomicLong(); + private final Map idToTransaction; + private final Collection childBoundaries; private final ShardedDOMDataTreeProducer producer; private final String identifier; - private final Set childBoundaries = new HashSet<>(); + + private final SettableFuture future = SettableFuture.create(); + private final CheckedFuture submitFuture = + Futures.makeChecked(future, SUBMIT_FAILED_MAPPER); + @GuardedBy("this") private boolean closed = false; @@ -56,26 +65,24 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware ShardedDOMDataTreeWriteTransaction(final ShardedDOMDataTreeProducer producer, final Map idToProducer, - final Map childProducers) { + final Set childRoots) { this.producer = Preconditions.checkNotNull(producer); - idToTransaction = new HashMap<>(); - Preconditions.checkNotNull(idToProducer).forEach((id, prod) -> idToTransaction.put( - id, prod.createTransaction())); this.identifier = "SHARDED-DOM-" + COUNTER.getAndIncrement(); - childProducers.forEach((id, prod) -> childBoundaries.add(id.getRootIdentifier())); + idToTransaction = ImmutableMap.copyOf(Maps.transformValues(idToProducer, + DOMDataTreeShardProducer::createTransaction)); + childBoundaries = Preconditions.checkNotNull(childRoots); + LOG.debug("Created new transaction {}", identifier); } - // FIXME: use atomic operations - @GuardedBy("this") private DOMDataTreeShardWriteTransaction lookup(final DOMDataTreeIdentifier prefix) { for (final Entry e : idToTransaction.entrySet()) { if (e.getKey().contains(prefix)) { Preconditions.checkArgument(!producer.isDelegatedToChild(prefix), - "Path %s is delegated to child producer.", - prefix); + "Path %s is delegated to child producer.", prefix); return e.getValue(); } } + throw new IllegalArgumentException(String.format("Path %s is not accessible from transaction %s", prefix, this)); } @@ -95,7 +102,7 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware if (openCursor != null) { openCursor.close(); } - for (final DOMDataTreeShardWriteTransaction tx : ImmutableSet.copyOf(idToTransaction.values())) { + for (final DOMDataTreeShardWriteTransaction tx : idToTransaction.values()) { tx.close(); } @@ -118,28 +125,45 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware Preconditions.checkState(!closed, "Transaction %s is already closed", identifier); Preconditions.checkState(openCursor == null, "Cannot submit transaction while there is a cursor open"); - final Set txns = ImmutableSet.copyOf(idToTransaction.values()); - final ListenableFuture> listListenableFuture = - Futures.allAsList(txns.stream().map(tx -> { - tx.ready(); - return tx.submit(); - }).collect(Collectors.toList())); + producer.processTransaction(this); + return submitFuture; + } + + CheckedFuture doSubmit( + final Consumer success, + final BiConsumer failure) { + + final ListenableFuture> listListenableFuture = Futures.allAsList( + idToTransaction.values().stream().map(tx -> { + LOG.debug("Readying tx {}", identifier); + tx.ready(); + return tx.submit(); + }).collect(Collectors.toList())); final SettableFuture ret = SettableFuture.create(); Futures.addCallback(listListenableFuture, new FutureCallback>() { @Override public void onSuccess(final List result) { + success.accept(ShardedDOMDataTreeWriteTransaction.this); ret.set(null); } @Override public void onFailure(final Throwable exp) { + failure.accept(ShardedDOMDataTreeWriteTransaction.this, exp); ret.setException(exp); } }); - producer.transactionSubmitted(this); - return Futures.makeChecked(ret, TransactionCommitFailedExceptionMapper.create("submit")); + return Futures.makeChecked(ret, SUBMIT_FAILED_MAPPER); + } + + void onTransactionSuccess(final Void result) { + future.set(result); + } + + void onTransactionFailure(final Throwable throwable) { + future.setException(throwable); } synchronized void cursorClosed() { @@ -149,10 +173,12 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware private class DelegatingCursor implements DOMDataTreeWriteCursor { private final DOMDataTreeWriteCursor delegate; + private final DOMDataTreeIdentifier rootPosition; private final Deque path = new LinkedList<>(); DelegatingCursor(final DOMDataTreeWriteCursor delegate, final DOMDataTreeIdentifier rootPosition) { - this.delegate = delegate; + this.delegate = Preconditions.checkNotNull(delegate); + this.rootPosition = Preconditions.checkNotNull(rootPosition); path.addAll(rootPosition.getRootIdentifier().getPathArguments()); } @@ -193,6 +219,12 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware @Override public void close() { + int depthEntered = path.size() - rootPosition.getRootIdentifier().getPathArguments().size(); + if (depthEntered > 0) { + // clean up existing modification cursor in case this tx will be reused for batching + delegate.exit(depthEntered); + } + delegate.close(); cursorClosed(); }