Execute the ShardedDOMDataTreeTransaction.submit() async. 42/44542/4
authorTomas Cere <tcere@cisco.com>
Tue, 23 Aug 2016 08:38:55 +0000 (10:38 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 25 Aug 2016 17:38:35 +0000 (17:38 +0000)
Change-Id: I7e8a59f03a6ed2f3e37d0aa31a2fc12b983644a8
Signed-off-by: Tomas Cere <tcere@cisco.com>
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTree.java
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java
dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducerMultiShardTest.java
dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeTest.java
dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java
dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardTest.java

index 2ef26a9dc34d7ffa581ec0853f4a51b9fc9951a0..5ea7378fea5ffc26a3e465b80e673b6d80f02521 100644 (file)
@@ -39,7 +39,6 @@ public final class ShardedDOMDataTree implements DOMDataTreeService, DOMDataTree
     @GuardedBy("this")
     private final DOMDataTreePrefixTable<DOMDataTreeProducer> producers = DOMDataTreePrefixTable.create();
 
-
     void removeShard(final ShardRegistration<?> reg) {
         final DOMDataTreeIdentifier prefix = reg.getPrefix();
         final ShardRegistration<?> parentReg;
index d6f44bab17fb1d41c9b79488cf0957fc721f4e3b..e1ad2f9795654578b6657e82717d386a9b1eb22d 100644 (file)
@@ -10,19 +10,20 @@ package org.opendaylight.mdsal.dom.broker;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Collection;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -116,15 +117,27 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware
         Preconditions.checkState(openCursor == null, "Cannot submit transaction while there is a cursor open");
 
         final Set<DOMDataTreeShardWriteTransaction> txns = ImmutableSet.copyOf(idToTransaction.values());
-        for (final DOMDataTreeShardWriteTransaction tx : txns) {
-            tx.ready();
-        }
+        final ListenableFuture<List<Void>> listListenableFuture =
+                Futures.allAsList(txns.stream().map(tx -> {
+                    tx.ready();
+                    return tx.submit();
+                }).collect(Collectors.toList()));
+
+        final SettableFuture<Void> ret = SettableFuture.create();
+        Futures.addCallback(listListenableFuture, new FutureCallback<List<Void>>() {
+            @Override
+            public void onSuccess(final List<Void> result) {
+                ret.set(null);
+            }
+
+            @Override
+            public void onFailure(final Throwable t) {
+                ret.setException(t);
+            }
+        });
+
         producer.transactionSubmitted(this);
-        try {
-            return Futures.immediateCheckedFuture(new SubmitCoordinationTask(identifier, txns).call());
-        } catch (final TransactionCommitFailedException e) {
-            return Futures.immediateFailedCheckedFuture(e);
-        }
+        return Futures.makeChecked(ret, TransactionCommitFailedExceptionMapper.create("submit"));
     }
 
     synchronized void cursorClosed() {
@@ -212,52 +225,4 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware
             path.removeLast();
         }
     }
-
-    private static class SubmitCoordinationTask implements Callable<Void> {
-
-        private static final Logger LOG = LoggerFactory.getLogger(SubmitCoordinationTask.class);
-
-        private final String identifier;
-        private final Collection<DOMDataTreeShardWriteTransaction> transactions;
-
-        SubmitCoordinationTask(final String identifier,
-                                    final Collection<DOMDataTreeShardWriteTransaction> transactions) {
-            this.identifier = identifier;
-            this.transactions = transactions;
-        }
-
-        @Override
-        public Void call() throws TransactionCommitFailedException {
-
-            try {
-                LOG.debug("Producer {}, submit started", identifier);
-                submitBlocking();
-
-                return null;
-            } catch (final TransactionCommitFailedException e) {
-                LOG.warn("Failure while submitting transaction for producer {}", identifier, e);
-                //FIXME abort here
-                throw e;
-            }
-        }
-
-        void submitBlocking() throws TransactionCommitFailedException {
-            for (final ListenableFuture<?> commit : submitAll()) {
-                try {
-                    commit.get();
-                } catch (InterruptedException | ExecutionException e) {
-                    throw new TransactionCommitFailedException("Submit failed", e);
-                }
-            }
-        }
-
-        private ListenableFuture<?>[] submitAll() {
-            final ListenableFuture<?>[] ops = new ListenableFuture<?>[transactions.size()];
-            int i = 0;
-            for (final DOMDataTreeShardWriteTransaction tx : transactions) {
-                ops[i++] = tx.submit();
-            }
-            return ops;
-        }
-    }
 }
index 02c1ff9ba0b6de68c9e09560d7274fa1874da7a5..83996f6cf1188e90376f98d087d7f3f6c1c16302 100644 (file)
@@ -106,7 +106,7 @@ public class ShardedDOMDataTreeProducerMultiShardTest {
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
 
-        rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1, 1);
+        rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1);
         rootShard.onGlobalContextUpdated(schemaContext);
 
         final ShardedDOMDataTree dataTree = new ShardedDOMDataTree();
@@ -219,7 +219,7 @@ public class ShardedDOMDataTreeProducerMultiShardTest {
         final DOMDataTreeListener mockedDataTreeListener = Mockito.mock(DOMDataTreeListener.class);
         doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap());
 
-        final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID, executor, 1, 1);
+        final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID, executor, 1);
         innerShard.onGlobalContextUpdated(schemaContext);
         final DOMDataTreeProducer shardRegProducer = dataTreeService.createProducer(Collections.singletonList(INNER_CONTAINER_ID));
         innerShardReg = dataTreeService.registerDataTreeShard(INNER_CONTAINER_ID, innerShard, shardRegProducer);
index 528234f56e3f715b311c7d7cd7ada943ee3dfc85..6cc21b8d0575cc74c256393402e887a9bdbf7411 100644 (file)
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import junit.framework.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -93,7 +92,7 @@ public class ShardedDOMDataTreeTest {
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
 
-        rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1, 1);
+        rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1);
         rootShard.onGlobalContextUpdated(schemaContext);
 
         final ShardedDOMDataTree dataTree = new ShardedDOMDataTree();
@@ -114,15 +113,15 @@ public class ShardedDOMDataTreeTest {
     public void testShardRegistrationClose() throws Exception {
         rootShardReg.close();
 
-        final InMemoryDOMDataTreeShard newRootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1, 1);
+        final InMemoryDOMDataTreeShard newRootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1);
         newRootShard.onGlobalContextUpdated(schemaContext);
         final DOMDataTreeProducer shardRegProducer = dataTreeService.createProducer(Collections.singletonList(ROOT_ID));
 
-        ListenerRegistration<InMemoryDOMDataTreeShard> newRootShardReg =
+        final ListenerRegistration<InMemoryDOMDataTreeShard> newRootShardReg =
                 dataTreeService.registerDataTreeShard(ROOT_ID, rootShard, shardRegProducer);
         shardRegProducer.close();
 
-        final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID, executor, 1, 1);
+        final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID, executor, 1);
         innerShard.onGlobalContextUpdated(schemaContext);
         final DOMDataTreeProducer shardRegProducer2 = dataTreeService.createProducer(Collections.singletonList(INNER_CONTAINER_ID));
         ListenerRegistration<InMemoryDOMDataTreeShard> innerShardReg = dataTreeService.registerDataTreeShard(INNER_CONTAINER_ID, innerShard, shardRegProducer2);
index 21ead91ae91b392f925584d9d61bf1340057cd65..f518346daad47f84cf1ac26bcc1e3d05fcdc76f1 100644 (file)
@@ -20,6 +20,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import javax.annotation.Nonnull;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
@@ -27,7 +28,6 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
@@ -66,28 +66,25 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha
     private final InMemoryDOMDataTreeShardChangePublisher shardChangePublisher;
     private final ListeningExecutorService executor;
 
-    private SchemaContext schemaContext;
-
     private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final ExecutorService dataTreeChangeExecutor,
-                                     final int maxDataChangeListenerQueueSize, final int maxCommitQueueSize) {
+                                     final int maxDataChangeListenerQueueSize) {
         this.prefix = Preconditions.checkNotNull(prefix);
 
         final TreeType treeType = treeTypeFor(prefix.getDatastoreType());
         this.dataTree = InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier());
 
         this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor, maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards);
-        this.executor = MoreExecutors.listeningDecorator(SpecialExecutors.newBoundedSingleThreadExecutor(maxCommitQueueSize, "Shard-executor[" + prefix + "]"));
+        this.executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
     }
 
     public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id, final ExecutorService dataTreeChangeExecutor,
-                                                  final int maxDataChangeListenerQueueSize, final int maxCommitQueueSize) {
-        return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize, maxCommitQueueSize);
+                                                  final int maxDataChangeListenerQueueSize) {
+        return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize);
     }
 
     @Override
     public void onGlobalContextUpdated(final SchemaContext context) {
         dataTree.setSchemaContext(context);
-        schemaContext = context;
     }
 
     @Override
index aabd3725c8b092929884832e1e22f7aa34b660ca..add31fec0892226e980305aa0f5cc9780d8658a1 100644 (file)
@@ -39,7 +39,7 @@ public class InMemoryDOMDataTreeShardTest {
     public void basicTest() throws Exception {
         final InMemoryDOMDataTreeShard inMemoryDOMDataTreeShard =
                 InMemoryDOMDataTreeShard.create(DOM_DATA_TREE_IDENTIFIER,
-                        MoreExecutors.newDirectExecutorService(), 1, 1);
+                        MoreExecutors.newDirectExecutorService(), 1);
 
         final DOMDataTreeIdentifier domDataTreeIdentifier =
                 new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
@@ -78,7 +78,7 @@ public class InMemoryDOMDataTreeShardTest {
 
         final InMemoryDOMDataTreeShard inMemoryDOMDataTreeShard =
                 InMemoryDOMDataTreeShard.create(domDataTreeIdentifier,
-                        MoreExecutors.newDirectExecutorService(), 1 ,1);
+                        MoreExecutors.newDirectExecutorService(), 1);
 
         final InmemoryDOMDataTreeShardWriteTransaction inmemoryDOMDataTreeShardWriteTransaction =
                 mock(InmemoryDOMDataTreeShardWriteTransaction.class);