X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=dom%2Fmdsal-dom-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Fdom%2Fbroker%2FShardedDOMDataTreeWriteTransaction.java;h=e1ad2f9795654578b6657e82717d386a9b1eb22d;hb=2edcadb19348d7ad4f9d6b00c96ddce2d0320214;hp=d6f44bab17fb1d41c9b79488cf0957fc721f4e3b;hpb=3266d82b4f2bbbf24f66fac99619d36a0508779c;p=mdsal.git diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java index d6f44bab17..e1ad2f9795 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java @@ -10,19 +10,20 @@ package org.opendaylight.mdsal.dom.broker; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import java.util.Collection; +import com.google.common.util.concurrent.SettableFuture; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; @@ -116,15 +117,27 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware Preconditions.checkState(openCursor == null, "Cannot submit transaction while there is a cursor open"); final Set txns = ImmutableSet.copyOf(idToTransaction.values()); - for (final DOMDataTreeShardWriteTransaction tx : txns) { - tx.ready(); - } + final ListenableFuture> listListenableFuture = + Futures.allAsList(txns.stream().map(tx -> { + tx.ready(); + return tx.submit(); + }).collect(Collectors.toList())); + + final SettableFuture ret = SettableFuture.create(); + Futures.addCallback(listListenableFuture, new FutureCallback>() { + @Override + public void onSuccess(final List result) { + ret.set(null); + } + + @Override + public void onFailure(final Throwable t) { + ret.setException(t); + } + }); + producer.transactionSubmitted(this); - try { - return Futures.immediateCheckedFuture(new SubmitCoordinationTask(identifier, txns).call()); - } catch (final TransactionCommitFailedException e) { - return Futures.immediateFailedCheckedFuture(e); - } + return Futures.makeChecked(ret, TransactionCommitFailedExceptionMapper.create("submit")); } synchronized void cursorClosed() { @@ -212,52 +225,4 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware path.removeLast(); } } - - private static class SubmitCoordinationTask implements Callable { - - private static final Logger LOG = LoggerFactory.getLogger(SubmitCoordinationTask.class); - - private final String identifier; - private final Collection transactions; - - SubmitCoordinationTask(final String identifier, - final Collection transactions) { - this.identifier = identifier; - this.transactions = transactions; - } - - @Override - public Void call() throws TransactionCommitFailedException { - - try { - LOG.debug("Producer {}, submit started", identifier); - submitBlocking(); - - return null; - } catch (final TransactionCommitFailedException e) { - LOG.warn("Failure while submitting transaction for producer {}", identifier, e); - //FIXME abort here - throw e; - } - } - - void submitBlocking() throws TransactionCommitFailedException { - for (final ListenableFuture commit : submitAll()) { - try { - commit.get(); - } catch (InterruptedException | ExecutionException e) { - throw new TransactionCommitFailedException("Submit failed", e); - } - } - } - - private ListenableFuture[] submitAll() { - final ListenableFuture[] ops = new ListenableFuture[transactions.size()]; - int i = 0; - for (final DOMDataTreeShardWriteTransaction tx : transactions) { - ops[i++] = tx.submit(); - } - return ops; - } - } }