Execute the ShardedDOMDataTreeTransaction.submit() async.
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / ShardedDOMDataTreeWriteTransaction.java
index d6f44bab17fb1d41c9b79488cf0957fc721f4e3b..e1ad2f9795654578b6657e82717d386a9b1eb22d 100644 (file)
@@ -10,19 +10,20 @@ package org.opendaylight.mdsal.dom.broker;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Collection;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -116,15 +117,27 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware
         Preconditions.checkState(openCursor == null, "Cannot submit transaction while there is a cursor open");
 
         final Set<DOMDataTreeShardWriteTransaction> txns = ImmutableSet.copyOf(idToTransaction.values());
-        for (final DOMDataTreeShardWriteTransaction tx : txns) {
-            tx.ready();
-        }
+        final ListenableFuture<List<Void>> listListenableFuture =
+                Futures.allAsList(txns.stream().map(tx -> {
+                    tx.ready();
+                    return tx.submit();
+                }).collect(Collectors.toList()));
+
+        final SettableFuture<Void> ret = SettableFuture.create();
+        Futures.addCallback(listListenableFuture, new FutureCallback<List<Void>>() {
+            @Override
+            public void onSuccess(final List<Void> result) {
+                ret.set(null);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                ret.setException(t);
+            }
+        });
+
         producer.transactionSubmitted(this);
-        try {
-            return Futures.immediateCheckedFuture(new SubmitCoordinationTask(identifier, txns).call());
-        } catch (final TransactionCommitFailedException e) {
-            return Futures.immediateFailedCheckedFuture(e);
-        }
+        return Futures.makeChecked(ret, TransactionCommitFailedExceptionMapper.create("submit"));
     }
 
     synchronized void cursorClosed() {
@@ -212,52 +225,4 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware
             path.removeLast();
         }
     }
-
-    private static class SubmitCoordinationTask implements Callable<Void> {
-
-        private static final Logger LOG = LoggerFactory.getLogger(SubmitCoordinationTask.class);
-
-        private final String identifier;
-        private final Collection<DOMDataTreeShardWriteTransaction> transactions;
-
-        SubmitCoordinationTask(final String identifier,
-                                    final Collection<DOMDataTreeShardWriteTransaction> transactions) {
-            this.identifier = identifier;
-            this.transactions = transactions;
-        }
-
-        @Override
-        public Void call() throws TransactionCommitFailedException {
-
-            try {
-                LOG.debug("Producer {}, submit started", identifier);
-                submitBlocking();
-
-                return null;
-            } catch (final TransactionCommitFailedException e) {
-                LOG.warn("Failure while submitting transaction for producer {}", identifier, e);
-                //FIXME abort here
-                throw e;
-            }
-        }
-
-        void submitBlocking() throws TransactionCommitFailedException {
-            for (final ListenableFuture<?> commit : submitAll()) {
-                try {
-                    commit.get();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new TransactionCommitFailedException("Submit failed", e);
-                }
-            }
-        }
-
-        private ListenableFuture<?>[] submitAll() {
-            final ListenableFuture<?>[] ops = new ListenableFuture<?>[transactions.size()];
-            int i = 0;
-            for (final DOMDataTreeShardWriteTransaction tx : transactions) {
-                ops[i++] = tx.submit();
-            }
-            return ops;
-        }
-    }
 }