From 2edcadb19348d7ad4f9d6b00c96ddce2d0320214 Mon Sep 17 00:00:00 2001 From: Tomas Cere Date: Tue, 23 Aug 2016 10:38:55 +0200 Subject: [PATCH] Execute the ShardedDOMDataTreeTransaction.submit() async. Change-Id: I7e8a59f03a6ed2f3e37d0aa31a2fc12b983644a8 Signed-off-by: Tomas Cere --- .../mdsal/dom/broker/ShardedDOMDataTree.java | 1 - .../ShardedDOMDataTreeWriteTransaction.java | 83 ++++++------------- ...rdedDOMDataTreeProducerMultiShardTest.java | 4 +- .../dom/broker/ShardedDOMDataTreeTest.java | 9 +- .../inmemory/InMemoryDOMDataTreeShard.java | 13 ++- .../InMemoryDOMDataTreeShardTest.java | 4 +- 6 files changed, 37 insertions(+), 77 deletions(-) diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTree.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTree.java index 2ef26a9dc3..5ea7378fea 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTree.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTree.java @@ -39,7 +39,6 @@ public final class ShardedDOMDataTree implements DOMDataTreeService, DOMDataTree @GuardedBy("this") private final DOMDataTreePrefixTable producers = DOMDataTreePrefixTable.create(); - void removeShard(final ShardRegistration reg) { final DOMDataTreeIdentifier prefix = reg.getPrefix(); final ShardRegistration parentReg; diff --git a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java index d6f44bab17..e1ad2f9795 100644 --- a/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java +++ b/dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeWriteTransaction.java @@ -10,19 +10,20 @@ package org.opendaylight.mdsal.dom.broker; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import java.util.Collection; +import com.google.common.util.concurrent.SettableFuture; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.NotThreadSafe; @@ -116,15 +117,27 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware Preconditions.checkState(openCursor == null, "Cannot submit transaction while there is a cursor open"); final Set txns = ImmutableSet.copyOf(idToTransaction.values()); - for (final DOMDataTreeShardWriteTransaction tx : txns) { - tx.ready(); - } + final ListenableFuture> listListenableFuture = + Futures.allAsList(txns.stream().map(tx -> { + tx.ready(); + return tx.submit(); + }).collect(Collectors.toList())); + + final SettableFuture ret = SettableFuture.create(); + Futures.addCallback(listListenableFuture, new FutureCallback>() { + @Override + public void onSuccess(final List result) { + ret.set(null); + } + + @Override + public void onFailure(final Throwable t) { + ret.setException(t); + } + }); + producer.transactionSubmitted(this); - try { - return Futures.immediateCheckedFuture(new SubmitCoordinationTask(identifier, txns).call()); - } catch (final TransactionCommitFailedException e) { - return Futures.immediateFailedCheckedFuture(e); - } + return Futures.makeChecked(ret, TransactionCommitFailedExceptionMapper.create("submit")); } synchronized void cursorClosed() { @@ -212,52 +225,4 @@ final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAware path.removeLast(); } } - - private static class SubmitCoordinationTask implements Callable { - - private static final Logger LOG = LoggerFactory.getLogger(SubmitCoordinationTask.class); - - private final String identifier; - private final Collection transactions; - - SubmitCoordinationTask(final String identifier, - final Collection transactions) { - this.identifier = identifier; - this.transactions = transactions; - } - - @Override - public Void call() throws TransactionCommitFailedException { - - try { - LOG.debug("Producer {}, submit started", identifier); - submitBlocking(); - - return null; - } catch (final TransactionCommitFailedException e) { - LOG.warn("Failure while submitting transaction for producer {}", identifier, e); - //FIXME abort here - throw e; - } - } - - void submitBlocking() throws TransactionCommitFailedException { - for (final ListenableFuture commit : submitAll()) { - try { - commit.get(); - } catch (InterruptedException | ExecutionException e) { - throw new TransactionCommitFailedException("Submit failed", e); - } - } - } - - private ListenableFuture[] submitAll() { - final ListenableFuture[] ops = new ListenableFuture[transactions.size()]; - int i = 0; - for (final DOMDataTreeShardWriteTransaction tx : transactions) { - ops[i++] = tx.submit(); - } - return ops; - } - } } diff --git a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducerMultiShardTest.java b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducerMultiShardTest.java index 02c1ff9ba0..83996f6cf1 100644 --- a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducerMultiShardTest.java +++ b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeProducerMultiShardTest.java @@ -106,7 +106,7 @@ public class ShardedDOMDataTreeProducerMultiShardTest { public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1, 1); + rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1); rootShard.onGlobalContextUpdated(schemaContext); final ShardedDOMDataTree dataTree = new ShardedDOMDataTree(); @@ -219,7 +219,7 @@ public class ShardedDOMDataTreeProducerMultiShardTest { final DOMDataTreeListener mockedDataTreeListener = Mockito.mock(DOMDataTreeListener.class); doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap()); - final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID, executor, 1, 1); + final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID, executor, 1); innerShard.onGlobalContextUpdated(schemaContext); final DOMDataTreeProducer shardRegProducer = dataTreeService.createProducer(Collections.singletonList(INNER_CONTAINER_ID)); innerShardReg = dataTreeService.registerDataTreeShard(INNER_CONTAINER_ID, innerShard, shardRegProducer); diff --git a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeTest.java b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeTest.java index 528234f56e..6cc21b8d05 100644 --- a/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeTest.java +++ b/dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/ShardedDOMDataTreeTest.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import junit.framework.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -93,7 +92,7 @@ public class ShardedDOMDataTreeTest { public void setUp() throws Exception { MockitoAnnotations.initMocks(this); - rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1, 1); + rootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1); rootShard.onGlobalContextUpdated(schemaContext); final ShardedDOMDataTree dataTree = new ShardedDOMDataTree(); @@ -114,15 +113,15 @@ public class ShardedDOMDataTreeTest { public void testShardRegistrationClose() throws Exception { rootShardReg.close(); - final InMemoryDOMDataTreeShard newRootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1, 1); + final InMemoryDOMDataTreeShard newRootShard = InMemoryDOMDataTreeShard.create(ROOT_ID, executor, 1); newRootShard.onGlobalContextUpdated(schemaContext); final DOMDataTreeProducer shardRegProducer = dataTreeService.createProducer(Collections.singletonList(ROOT_ID)); - ListenerRegistration newRootShardReg = + final ListenerRegistration newRootShardReg = dataTreeService.registerDataTreeShard(ROOT_ID, rootShard, shardRegProducer); shardRegProducer.close(); - final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID, executor, 1, 1); + final InMemoryDOMDataTreeShard innerShard = InMemoryDOMDataTreeShard.create(INNER_CONTAINER_ID, executor, 1); innerShard.onGlobalContextUpdated(schemaContext); final DOMDataTreeProducer shardRegProducer2 = dataTreeService.createProducer(Collections.singletonList(INNER_CONTAINER_ID)); ListenerRegistration innerShardReg = dataTreeService.registerDataTreeShard(INNER_CONTAINER_ID, innerShard, shardRegProducer2); diff --git a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java index 21ead91ae9..f518346daa 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java +++ b/dom/mdsal-dom-inmemory-datastore/src/main/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShard.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.annotation.Nonnull; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener; @@ -27,7 +28,6 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeShard; import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; @@ -66,28 +66,25 @@ public class InMemoryDOMDataTreeShard implements ReadableWriteableDOMDataTreeSha private final InMemoryDOMDataTreeShardChangePublisher shardChangePublisher; private final ListeningExecutorService executor; - private SchemaContext schemaContext; - private InMemoryDOMDataTreeShard(final DOMDataTreeIdentifier prefix, final ExecutorService dataTreeChangeExecutor, - final int maxDataChangeListenerQueueSize, final int maxCommitQueueSize) { + final int maxDataChangeListenerQueueSize) { this.prefix = Preconditions.checkNotNull(prefix); final TreeType treeType = treeTypeFor(prefix.getDatastoreType()); this.dataTree = InMemoryDataTreeFactory.getInstance().create(treeType, prefix.getRootIdentifier()); this.shardChangePublisher = new InMemoryDOMDataTreeShardChangePublisher(dataTreeChangeExecutor, maxDataChangeListenerQueueSize, dataTree, prefix.getRootIdentifier(), childShards); - this.executor = MoreExecutors.listeningDecorator(SpecialExecutors.newBoundedSingleThreadExecutor(maxCommitQueueSize, "Shard-executor[" + prefix + "]")); + this.executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); } public static InMemoryDOMDataTreeShard create(final DOMDataTreeIdentifier id, final ExecutorService dataTreeChangeExecutor, - final int maxDataChangeListenerQueueSize, final int maxCommitQueueSize) { - return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize, maxCommitQueueSize); + final int maxDataChangeListenerQueueSize) { + return new InMemoryDOMDataTreeShard(id, dataTreeChangeExecutor, maxDataChangeListenerQueueSize); } @Override public void onGlobalContextUpdated(final SchemaContext context) { dataTree.setSchemaContext(context); - schemaContext = context; } @Override diff --git a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardTest.java b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardTest.java index aabd3725c8..add31fec08 100644 --- a/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardTest.java +++ b/dom/mdsal-dom-inmemory-datastore/src/test/java/org/opendaylight/mdsal/dom/store/inmemory/InMemoryDOMDataTreeShardTest.java @@ -39,7 +39,7 @@ public class InMemoryDOMDataTreeShardTest { public void basicTest() throws Exception { final InMemoryDOMDataTreeShard inMemoryDOMDataTreeShard = InMemoryDOMDataTreeShard.create(DOM_DATA_TREE_IDENTIFIER, - MoreExecutors.newDirectExecutorService(), 1, 1); + MoreExecutors.newDirectExecutorService(), 1); final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, @@ -78,7 +78,7 @@ public class InMemoryDOMDataTreeShardTest { final InMemoryDOMDataTreeShard inMemoryDOMDataTreeShard = InMemoryDOMDataTreeShard.create(domDataTreeIdentifier, - MoreExecutors.newDirectExecutorService(), 1 ,1); + MoreExecutors.newDirectExecutorService(), 1); final InmemoryDOMDataTreeShardWriteTransaction inmemoryDOMDataTreeShardWriteTransaction = mock(InmemoryDOMDataTreeShardWriteTransaction.class); -- 2.36.6