From 5bcd1acc0d6407ff27365e3dce7810589eae2a2d Mon Sep 17 00:00:00 2001 From: tpantelis Date: Tue, 29 Jul 2014 06:42:14 -0400 Subject: [PATCH] Bug 1430: Off-load notifications from single commit thread Modified the InMemoryDOMDataStore to use the new QueuedNotificationManager class added to yangtools common util for DataChangeListener notifications. Modified DOMDataCommitCoordinatorImpl's ListeningExecutorService to one that off-loads ListenableFuture Runnable callbacks on a separate executor. Change-Id: I31f2fb002131c6d91b205d33255dd1bbc6433d9b Signed-off-by: tpantelis --- .../test/DataBrokerTestCustomizer.java | 6 +- .../binding/test/util/BindingTestContext.java | 7 +- .../datastore/DistributedDataStore.java | 25 +++++-- .../controller/cluster/datastore/Shard.java | 9 +-- .../DataChangeListenerRegistrationTest.java | 5 +- .../datastore/ShardTransactionChainTest.java | 7 +- .../datastore/ShardTransactionTest.java | 19 ++++- .../AbstractModificationTest.java | 3 +- .../dom/impl/DomInmemoryDataBrokerModule.java | 64 ++++++++++++---- .../impl/DOMDataCommitCoordinatorImpl.java | 15 +++- .../broker/impl/DOMBrokerPerformanceTest.java | 6 +- .../md/sal/dom/broker/impl/DOMBrokerTest.java | 58 ++++++++++++++- .../broker/impl/DOMTransactionChainTest.java | 6 +- ...InMemoryConfigDataStoreProviderModule.java | 11 +-- ...oryOperationalDataStoreProviderModule.java | 11 +-- .../store/impl/ChangeListenerNotifyTask.java | 20 +++-- .../dom/store/impl/InMemoryDOMDataStore.java | 73 ++++++++++++++++--- .../impl/InMemoryDOMDataStoreFactory.java | 72 ++++++++++++++++++ .../impl/ResolveDataChangeEventsTask.java | 34 ++++++--- .../impl/AbstractDataChangeListenerTest.java | 19 ++++- .../sal/dom/store/impl/DatastoreTestTask.java | 40 ++++++++-- .../DefaultDataChangeListenerTestSuite.java | 19 +++-- .../dom/store/impl/InMemoryDataStoreTest.java | 3 +- .../dom/store/impl/RootScopeSubtreeTest.java | 16 ++-- .../impl/SchemaUpdateForTransactionTest.java | 3 +- .../store/impl/TestDCLExecutorService.java | 43 +++++++++++ .../store/impl/WildcardedScopeBaseTest.java | 21 ++---- .../store/impl/WildcardedScopeOneTest.java | 21 ++---- .../impl/WildcardedScopeSubtreeTest.java | 16 ++-- 29 files changed, 494 insertions(+), 158 deletions(-) create mode 100644 opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java create mode 100644 opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestDCLExecutorService.java diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java index e0f6f3546f..666c819c82 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java +++ b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java @@ -46,13 +46,15 @@ public class DataBrokerTestCustomizer { } public DOMStore createConfigurationDatastore() { - InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor()); + InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", + MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()); schemaService.registerSchemaContextListener(store); return store; } public DOMStore createOperationalDatastore() { - InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); + InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", + MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()); schemaService.registerSchemaContextListener(store); return store; } diff --git a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java index deb4a8aeca..fef5715f50 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java +++ b/opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java @@ -63,6 +63,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.MutableClassToInstanceMap; import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; @Beta public class BindingTestContext implements AutoCloseable { @@ -133,8 +134,10 @@ public class BindingTestContext implements AutoCloseable { public void startNewDomDataBroker() { checkState(executor != null, "Executor needs to be set"); - InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", executor); - InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", executor); + InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", executor, + MoreExecutors.sameThreadExecutor()); + InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", executor, + MoreExecutors.sameThreadExecutor()); newDatastores = ImmutableMap.builder() .put(LogicalDatastoreType.OPERATIONAL, operStore) .put(LogicalDatastoreType.CONFIGURATION, configStore) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 479af79748..c43307643b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -25,6 +25,8 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.util.PropertyUtils; +import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -32,8 +34,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; - /** * */ @@ -42,25 +42,34 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); - private static final int DEFAULT_EXECUTOR_POOL_SIZE = 10; + private static final String EXECUTOR_MAX_POOL_SIZE_PROP = + "mdsal.dist-datastore-executor-pool.size"; + private static final int DEFAULT_EXECUTOR_MAX_POOL_SIZE = 10; + + private static final String EXECUTOR_MAX_QUEUE_SIZE_PROP = + "mdsal.dist-datastore-executor-queue.size"; + private static final int DEFAULT_EXECUTOR_MAX_QUEUE_SIZE = 5000; private final String type; private final ActorContext actorContext; private SchemaContext schemaContext; - - /** * Executor used to run FutureTask's * * This is typically used when we need to make a request to an actor and * wait for it's response and the consumer needs to be provided a Future. - * - * FIXME : Make the thread pool size configurable. */ private final ListeningExecutorService executor = - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_EXECUTOR_POOL_SIZE)); + MoreExecutors.listeningDecorator( + SpecialExecutors.newBlockingBoundedFastThreadPool( + PropertyUtils.getIntSystemProperty( + EXECUTOR_MAX_POOL_SIZE_PROP, + DEFAULT_EXECUTOR_MAX_POOL_SIZE), + PropertyUtils.getIntSystemProperty( + EXECUTOR_MAX_QUEUE_SIZE_PROP, + DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore")); public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) { this(new ActorContext(actorSystem, actorSystem diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 10dbbc84d8..8f058a34c2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -17,8 +17,6 @@ import akka.japi.Creator; import akka.serialization.Serialization; import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -39,6 +37,7 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; @@ -52,7 +51,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -67,9 +65,6 @@ public class Shard extends RaftActor { public static final String DEFAULT_NAME = "default"; - private final ListeningExecutorService storeExecutor = - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); - private final InMemoryDOMDataStore store; private final Map @@ -101,7 +96,7 @@ public class Shard extends RaftActor { LOG.info("Creating shard : {} persistent : {}", name, persistent); - store = new InMemoryDOMDataStore(name, storeExecutor); + store = InMemoryDOMDataStoreFactory.create(name, null); shardMBean = ShardMBeanFactory.getShardStatsMBean(name); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java index 920248521a..eb2c24292a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java @@ -21,7 +21,8 @@ import static org.junit.Assert.assertEquals; public class DataChangeListenerRegistrationTest extends AbstractActorTest { private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor()); - private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor); + private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor, + MoreExecutors.sameThreadExecutor()); static { store.onGlobalContextUpdated(TestModel.createTestContext()); @@ -37,12 +38,14 @@ public class DataChangeListenerRegistrationTest extends AbstractActorTest { final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration"); new Within(duration("1 seconds")) { + @Override protected void run() { subject.tell(new CloseDataChangeListenerRegistration().toSerializable(), getRef()); final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.SERIALIZABLE_CLASS)) { return "match"; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java index b35880a6a5..d468af6664 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java @@ -19,7 +19,8 @@ public class ShardTransactionChainTest extends AbstractActorTest { private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor()); - private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor); + private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor, + MoreExecutors.sameThreadExecutor()); static { store.onGlobalContextUpdated(TestModel.createTestContext()); @@ -31,12 +32,14 @@ public class ShardTransactionChainTest extends AbstractActorTest { final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction"); new Within(duration("1 seconds")) { + @Override protected void run() { subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef()); final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { return CreateTransactionReply.fromSerializable(in).getTransactionPath(); @@ -66,12 +69,14 @@ public class ShardTransactionChainTest extends AbstractActorTest { final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain"); new Within(duration("1 seconds")) { + @Override protected void run() { subject.tell(new CloseTransactionChain().toSerializable(), getRef()); final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) { return "match"; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 632ecc29cd..6fe5154d55 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -42,7 +42,7 @@ public class ShardTransactionTest extends AbstractActorTest { MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor()); private static final InMemoryDOMDataStore store = - new InMemoryDOMDataStore("OPER", storeExecutor); + new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor()); private static final SchemaContext testSchemaContext = TestModel.createTestContext(); @@ -59,6 +59,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef subject = getSystem().actorOf(props, "testReadData"); new Within(duration("1 seconds")) { + @Override protected void run() { subject.tell( @@ -67,6 +68,7 @@ public class ShardTransactionTest extends AbstractActorTest { final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in) @@ -99,6 +101,7 @@ public class ShardTransactionTest extends AbstractActorTest { final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound"); new Within(duration("1 seconds")) { + @Override protected void run() { subject.tell( @@ -107,6 +110,7 @@ public class ShardTransactionTest extends AbstractActorTest { final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in) @@ -135,6 +139,7 @@ public class ShardTransactionTest extends AbstractActorTest { final Class modificationType) { new JavaTestKit(getSystem()) {{ new Within(duration("1 seconds")) { + @Override protected void run() { subject .tell(new ShardTransaction.GetCompositedModification(), @@ -143,6 +148,7 @@ public class ShardTransactionTest extends AbstractActorTest { final CompositeModification compositeModification = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected CompositeModification match(Object in) { if (in instanceof ShardTransaction.GetCompositeModificationReply) { return ((ShardTransaction.GetCompositeModificationReply) in) @@ -174,6 +180,7 @@ public class ShardTransactionTest extends AbstractActorTest { getSystem().actorOf(props, "testWriteData"); new Within(duration("1 seconds")) { + @Override protected void run() { subject.tell(new WriteData(TestModel.TEST_PATH, @@ -182,6 +189,7 @@ public class ShardTransactionTest extends AbstractActorTest { final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) { return "match"; @@ -212,6 +220,7 @@ public class ShardTransactionTest extends AbstractActorTest { getSystem().actorOf(props, "testMergeData"); new Within(duration("1 seconds")) { + @Override protected void run() { subject.tell(new MergeData(TestModel.TEST_PATH, @@ -220,6 +229,7 @@ public class ShardTransactionTest extends AbstractActorTest { final String out = new ExpectMsg(duration("500 milliseconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) { return "match"; @@ -251,12 +261,14 @@ public class ShardTransactionTest extends AbstractActorTest { getSystem().actorOf(props, "testDeleteData"); new Within(duration("1 seconds")) { + @Override protected void run() { subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef()); final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) { return "match"; @@ -288,12 +300,14 @@ public class ShardTransactionTest extends AbstractActorTest { getSystem().actorOf(props, "testReadyTransaction"); new Within(duration("1 seconds")) { + @Override protected void run() { subject.tell(new ReadyTransaction().toSerializable(), getRef()); final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) { return "match"; @@ -326,12 +340,14 @@ public class ShardTransactionTest extends AbstractActorTest { watch(subject); new Within(duration("2 seconds")) { + @Override protected void run() { subject.tell(new CloseTransaction().toSerializable(), getRef()); final String out = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) { return "match"; @@ -345,6 +361,7 @@ public class ShardTransactionTest extends AbstractActorTest { final String termination = new ExpectMsg(duration("1 seconds"), "match hint") { // do not put code outside this method, will run afterwards + @Override protected String match(Object in) { if (in instanceof Terminated) { return "match"; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModificationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModificationTest.java index d9c550a6db..84f3b92f1b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModificationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModificationTest.java @@ -26,7 +26,8 @@ public abstract class AbstractModificationTest { @Before public void setUp(){ - store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor()); + store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor(), + MoreExecutors.sameThreadExecutor()); store.onGlobalContextUpdated(TestModel.createTestContext()); } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java index 22dad6af23..948f3c8d8b 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java @@ -7,18 +7,18 @@ */ package org.opendaylight.controller.config.yang.md.sal.dom.impl; -import java.util.concurrent.Executors; - +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; +import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; +import org.opendaylight.yangtools.util.PropertyUtils; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; /** * @@ -26,6 +26,17 @@ import com.google.common.util.concurrent.MoreExecutors; public final class DomInmemoryDataBrokerModule extends org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractDomInmemoryDataBrokerModule { + private static final String FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP = + "mdsal.datastore-future-callback-queue.size"; + private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE = 1000; + + private static final String FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP = + "mdsal.datastore-future-callback-pool.size"; + private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE = 20; + private static final String COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP = + "mdsal.datastore-commit-queue.size"; + private static final int DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE = 5000; + public DomInmemoryDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); @@ -45,30 +56,55 @@ public final class DomInmemoryDataBrokerModule extends @Override public java.lang.AutoCloseable createInstance() { - ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); //Initializing Operational DOM DataStore defaulting to InMemoryDOMDataStore if one is not configured DOMStore operStore = getOperationalDataStoreDependency(); if(operStore == null){ //we will default to InMemoryDOMDataStore creation - operStore = new InMemoryDOMDataStore("DOM-OPER", storeExecutor); - //here we will register the SchemaContext listener - getSchemaServiceDependency().registerSchemaContextListener((InMemoryDOMDataStore)operStore); + operStore = InMemoryDOMDataStoreFactory.create("DOM-OPER", getSchemaServiceDependency()); } DOMStore configStore = getConfigDataStoreDependency(); if(configStore == null){ //we will default to InMemoryDOMDataStore creation - configStore = new InMemoryDOMDataStore("DOM-CFG", storeExecutor); - //here we will register the SchemaContext listener - getSchemaServiceDependency().registerSchemaContextListener((InMemoryDOMDataStore)configStore); + configStore = InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency()); } ImmutableMap datastores = ImmutableMap . builder().put(LogicalDatastoreType.OPERATIONAL, operStore) .put(LogicalDatastoreType.CONFIGURATION, configStore).build(); + /* + * We use a single-threaded executor for commits with a bounded queue capacity. If the + * queue capacity is reached, subsequent commit tasks will be rejected and the commits will + * fail. This is done to relieve back pressure. This should be an extreme scenario - either + * there's deadlock(s) somewhere and the controller is unstable or some rogue component is + * continuously hammering commits too fast or the controller is just over-capacity for the + * system it's running on. + */ + ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor( + PropertyUtils.getIntSystemProperty( + COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP, + DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE), "WriteTxCommit"); + + /* + * We use an executor for commit ListenableFuture callbacks that favors reusing available + * threads over creating new threads at the expense of execution time. The assumption is + * that most ListenableFuture callbacks won't execute a lot of business logic where we want + * it to run quicker - many callbacks will likely just handle error conditions and do + * nothing on success. The executor queue capacity is bounded and, if the capacity is + * reached, subsequent submitted tasks will block the caller. + */ + Executor listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool( + PropertyUtils.getIntSystemProperty( + FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP, + DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE), + PropertyUtils.getIntSystemProperty( + FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP, + DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE), "CommitFutures"); + DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores, - new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(), - TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION)); + new DeadlockDetectingListeningExecutorService(commitExecutor, + TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION, + listenableFutureExecutor)); return newDataBroker; } diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java index 9a6d12fb18..521e2d0e73 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.md.sal.dom.broker.impl; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.RejectedExecutionException; import javax.annotation.concurrent.GuardedBy; @@ -86,8 +87,18 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor { Preconditions.checkArgument(cohorts != null, "Cohorts must not be null."); Preconditions.checkArgument(listener != null, "Listener must not be null"); LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier()); - ListenableFuture commitFuture = executor.submit(new CommitCoordinationTask( - transaction, cohorts, listener)); + + ListenableFuture commitFuture = null; + try { + commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener)); + } catch(RejectedExecutionException e) { + LOG.error("The commit executor's queue is full - submit task was rejected. \n" + + executor, e); + return Futures.immediateFailedCheckedFuture( + new TransactionCommitFailedException( + "Could not submit the commit task - the commit queue capacity has been exceeded.", e)); + } + if (listener.isPresent()) { Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get())); } diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerPerformanceTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerPerformanceTest.java index 181396fc88..e9ed5b1b30 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerPerformanceTest.java +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerPerformanceTest.java @@ -63,8 +63,10 @@ public class DOMBrokerPerformanceTest { @Before public void setupStore() { - InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); - InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor()); + InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", + MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()); + InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", + MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()); schemaContext = TestModel.createTestContext(); operStore.onGlobalContextUpdated(schemaContext); diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java index 0bb16a39b9..e57d08f173 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java @@ -7,19 +7,24 @@ import static org.junit.Assert.assertEquals; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; @@ -28,6 +33,7 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.TestModel; import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService; +import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; @@ -35,6 +41,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ForwardingExecutorService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -46,11 +53,16 @@ public class DOMBrokerTest { private SchemaContext schemaContext; private DOMDataBrokerImpl domBroker; private ListeningExecutorService executor; + private ExecutorService futureExecutor; + private CommitExecutorService commitExecutor; @Before public void setupStore() { - InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); - InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor()); + + InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", + MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()); + InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", + MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()); schemaContext = TestModel.createTestContext(); operStore.onGlobalContextUpdated(schemaContext); @@ -61,8 +73,10 @@ public class DOMBrokerTest { .put(OPERATIONAL, operStore) // .build(); - executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(), - TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION); + commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor()); + futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB"); + executor = new DeadlockDetectingListeningExecutorService(commitExecutor, + TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION, futureExecutor); domBroker = new DOMDataBrokerImpl(stores, executor); } @@ -71,6 +85,10 @@ public class DOMBrokerTest { if( executor != null ) { executor.shutdownNow(); } + + if(futureExecutor != null) { + futureExecutor.shutdownNow(); + } } @Test(timeout=10000) @@ -137,6 +155,24 @@ public class DOMBrokerTest { assertTrue(afterCommitRead.isPresent()); } + @Test(expected=TransactionCommitFailedException.class) + public void testRejectedCommit() throws Exception { + + commitExecutor.delegate = Mockito.mock( ExecutorService.class ); + Mockito.doThrow( new RejectedExecutionException( "mock" ) ) + .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) ); + Mockito.doNothing().when( commitExecutor.delegate ).shutdown(); + Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow(); + Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString(); + Mockito.doReturn( true ).when( commitExecutor.delegate ) + .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) ); + + DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); + writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) ); + + writeTx.submit().checkedGet( 5, TimeUnit.SECONDS ); + } + /** * Tests a simple DataChangeListener notification after a write. */ @@ -306,4 +342,18 @@ public class DOMBrokerTest { assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) ); } } + + static class CommitExecutorService extends ForwardingExecutorService { + + ExecutorService delegate; + + public CommitExecutorService( ExecutorService delegate ) { + this.delegate = delegate; + } + + @Override + protected ExecutorService delegate() { + return delegate; + } + } } diff --git a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java index 3ea0bcefa5..18b11c8300 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java +++ b/opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java @@ -44,8 +44,10 @@ public class DOMTransactionChainTest { @Before public void setupStore() { - InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor()); - InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor()); + InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", + MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()); + InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", + MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor()); schemaContext = TestModel.createTestContext(); operStore.onGlobalContextUpdated(schemaContext); diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java index 805608d479..39a448ff6c 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java @@ -1,12 +1,9 @@ package org.opendaylight.controller.config.yang.inmemory_datastore_provider; -import java.util.concurrent.Executors; - -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; - -import com.google.common.util.concurrent.MoreExecutors; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryConfigDataStoreProviderModule { + public InMemoryConfigDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); } @@ -22,9 +19,7 @@ public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.cont @Override public java.lang.AutoCloseable createInstance() { - InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-CFG", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())); - getSchemaServiceDependency().registerSchemaContextListener(ids); - return ids; + return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency()); } } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java index f4795588ab..615fe0211c 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java @@ -1,12 +1,9 @@ package org.opendaylight.controller.config.yang.inmemory_datastore_provider; -import java.util.concurrent.Executors; - -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; - -import com.google.common.util.concurrent.MoreExecutors; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryOperationalDataStoreProviderModule { + public InMemoryOperationalDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { super(identifier, dependencyResolver); } @@ -22,9 +19,7 @@ public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight @Override public java.lang.AutoCloseable createInstance() { - InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-OPER", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())); - getOperationalSchemaServiceDependency().registerSchemaContextListener(ids); - return ids; + return InMemoryDOMDataStoreFactory.create("DOM-OPER", getOperationalSchemaServiceDependency()); } } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java index 27325d84a9..ac1f2e32d5 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.md.sal.dom.store.impl; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.util.concurrent.NotificationManager; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; @@ -16,31 +18,33 @@ import org.slf4j.LoggerFactory; class ChangeListenerNotifyTask implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class); + private final Iterable> listeners; private final AsyncDataChangeEvent> event; + @SuppressWarnings("rawtypes") + private final NotificationManager + notificationMgr; + + @SuppressWarnings("rawtypes") public ChangeListenerNotifyTask(final Iterable> listeners, - final AsyncDataChangeEvent> event) { + final AsyncDataChangeEvent> event, + final NotificationManager notificationMgr) { this.listeners = listeners; this.event = event; + this.notificationMgr = notificationMgr; } @Override public void run() { for (DataChangeListenerRegistration listener : listeners) { - try { - listener.getInstance().onDataChanged(event); - } catch (Exception e) { - LOG.error("Unhandled exception during invoking listener {} with event {}", listener, event, e); - } + notificationMgr.submitNotification(listener.getInstance(), event); } - } @Override public String toString() { return "ChangeListenerNotifyTask [listeners=" + listeners + ", event=" + event + "]"; } - } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java index c44d0909d6..b61b367103 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java @@ -13,11 +13,17 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; + import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype; +import org.opendaylight.yangtools.util.ExecutorServiceUtil; +import org.opendaylight.yangtools.util.PropertyUtils; +import org.opendaylight.yangtools.util.concurrent.NotificationManager; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager; import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; @@ -43,8 +49,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.concurrent.GuardedBy; + import java.util.Collections; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static com.google.common.base.Preconditions.checkState; @@ -61,16 +70,51 @@ import static com.google.common.base.Preconditions.checkState; public class InMemoryDOMDataStore implements DOMStore, Identifiable, SchemaContextListener, TransactionReadyPrototype,AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class); + + @SuppressWarnings("rawtypes") + private static final QueuedNotificationManager.Invoker DCL_NOTIFICATION_MGR_INVOKER = + new QueuedNotificationManager.Invoker() { + + @SuppressWarnings("unchecked") + @Override + public void invokeListener( AsyncDataChangeListener listener, + AsyncDataChangeEvent notification ) { + listener.onDataChanged(notification); + } + }; + + private static final String DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP = + "mdsal.datastore-dcl-notification-queue.size"; + + private static final int DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE = 1000; + private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create(); private final ListenerTree listenerTree = ListenerTree.create(); private final AtomicLong txCounter = new AtomicLong(0); - private final ListeningExecutorService executor; + private final ListeningExecutorService listeningExecutor; + + @SuppressWarnings("rawtypes") + private final NotificationManager + dataChangeListenerNotificationManager; + private final ExecutorService dataChangeListenerExecutor; private final String name; - public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) { + public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor, + final ExecutorService dataChangeListenerExecutor) { this.name = Preconditions.checkNotNull(name); - this.executor = Preconditions.checkNotNull(executor); + this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor); + + this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor); + + int maxDCLQueueSize = PropertyUtils.getIntSystemProperty( + DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE ); + + dataChangeListenerNotificationManager = + new QueuedNotificationManager<>(this.dataChangeListenerExecutor, + DCL_NOTIFICATION_MGR_INVOKER, maxDCLQueueSize, "DataChangeListenerQueueMgr"); } @Override @@ -104,8 +148,9 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch } @Override - public void close(){ - executor.shutdownNow(); + public void close() { + ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS); + ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS); } @Override public >> ListenerRegistration registerChangeListener( @@ -132,7 +177,9 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch .setAfter(data) // .addCreated(path, data) // .build(); - executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event)); + + new ChangeListenerNotifyTask(Collections.singletonList(reg), event, + dataChangeListenerNotificationManager).run(); } } @@ -221,8 +268,9 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public void close() { - executor.shutdownNow(); - + // FIXME: this call doesn't look right here - listeningExecutor is shared and owned + // by the outer class. + //listeningExecutor.shutdownNow(); } protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, @@ -308,7 +356,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture canCommit() { - return executor.submit(new Callable() { + return listeningExecutor.submit(new Callable() { @Override public Boolean call() throws TransactionCommitFailedException { try { @@ -330,11 +378,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch @Override public ListenableFuture preCommit() { - return executor.submit(new Callable() { + return listeningExecutor.submit(new Callable() { @Override public Void call() { candidate = dataTree.prepare(modification); - listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree); + listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree, + dataChangeListenerNotificationManager); return null; } }); @@ -359,7 +408,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable, Sch for (ChangeListenerNotifyTask task : listenerResolver.call()) { LOG.trace("Scheduling invocation of listeners: {}", task); - executor.submit(task); + task.run(); } } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java new file mode 100644 index 0000000000..c853a132de --- /dev/null +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.md.sal.dom.store.impl; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.annotation.Nullable; + +import org.opendaylight.controller.sal.core.api.model.SchemaService; +import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; +import org.opendaylight.yangtools.util.PropertyUtils; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * A factory for creating InMemoryDOMDataStore instances. + * + * @author Thomas Pantelis + */ +public final class InMemoryDOMDataStoreFactory { + + private static final String DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP = + "mdsal.datastore-dcl-notification-queue.size"; + private static final int DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE = 1000; + + private static final String DCL_EXECUTOR_MAX_POOL_SIZE_PROP = + "mdsal.datastore-dcl-notification-pool.size"; + private static final int DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE = 20; + + private InMemoryDOMDataStoreFactory() { + } + + /** + * Creates an InMemoryDOMDataStore instance. + * + * @param name the name of the data store + * @param schemaService the SchemaService to which to register the data store. + * @return an InMemoryDOMDataStore instance + */ + public static InMemoryDOMDataStore create(final String name, + @Nullable final SchemaService schemaService) { + + // For DataChangeListener notifications we use an executor that provides the fastest + // task execution time to get higher throughput as DataChangeListeners typically provide + // much of the business logic for a data model. If the executor queue size limit is reached, + // subsequent submitted notifications will block the calling thread. + + int dclExecutorMaxQueueSize = PropertyUtils.getIntSystemProperty( + DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE); + int dclExecutorMaxPoolSize = PropertyUtils.getIntSystemProperty( + DCL_EXECUTOR_MAX_POOL_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE); + + ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool( + dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" ); + + InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name, + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()), + dataChangeListenerExecutor); + + if(schemaService != null) { + schemaService.registerSchemaContextListener(dataStore); + } + + return dataStore; + } +} diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java index 3ddf0b60fa..d8feaa71f6 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java @@ -24,12 +24,15 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder; import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.SimpleEventFactory; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Node; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker; +import org.opendaylight.yangtools.util.concurrent.NotificationManager; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; @@ -57,9 +60,15 @@ final class ResolveDataChangeEventsTask implements Callable notificationMgr; + + @SuppressWarnings("rawtypes") + public ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree, + final NotificationManager notificationMgr) { this.candidate = Preconditions.checkNotNull(candidate); this.listenerRoot = Preconditions.checkNotNull(listenerTree); + this.notificationMgr = Preconditions.checkNotNull(notificationMgr); } /** @@ -120,7 +129,7 @@ final class ResolveDataChangeEventsTask implements Callable taskListBuilder, + private void addNotificationTask(final ImmutableList.Builder taskListBuilder, final ListenerTree.Node listeners, final Collection entries) { if (!entries.isEmpty()) { @@ -141,7 +150,7 @@ final class ResolveDataChangeEventsTask implements Callable taskListBuilder, final ListenerTree.Node listeners, final DOMImmutableDataChangeEvent event) { DataChangeScope eventScope = event.getScope(); @@ -150,11 +159,11 @@ final class ResolveDataChangeEventsTask implements Callable> listenerSet = Collections .> singletonList(listenerReg); if (eventScope == DataChangeScope.BASE) { - taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event)); + taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr)); } else if (eventScope == DataChangeScope.ONE && listenerScope != DataChangeScope.BASE) { - taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event)); + taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr)); } else if (eventScope == DataChangeScope.SUBTREE && listenerScope == DataChangeScope.SUBTREE) { - taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event)); + taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr)); } } } @@ -172,7 +181,7 @@ final class ResolveDataChangeEventsTask implements Callable taskListBuilder, final ListenerTree.Node listeners, final Collection entries) { @@ -210,14 +219,14 @@ final class ResolveDataChangeEventsTask implements Callable taskListBuilder, final Node listeners, final DOMImmutableDataChangeEvent event) { for (DataChangeListenerRegistration listener : listeners.getListeners()) { if (listener.getScope() == event.getScope()) { Set> listenerSet = Collections .> singleton(listener); - taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event)); + taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr)); } } } @@ -519,7 +528,10 @@ final class ResolveDataChangeEventsTask implements Callable notificationMgr) { + return new ResolveDataChangeEventsTask(candidate, listenerTree, notificationMgr); } } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/AbstractDataChangeListenerTest.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/AbstractDataChangeListenerTest.java index 3176ca764d..76a9354d1a 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/AbstractDataChangeListenerTest.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/AbstractDataChangeListenerTest.java @@ -9,7 +9,7 @@ package org.opendaylight.controller.md.sal.dom.store.impl; import java.util.Collection; import java.util.Map; - +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.opendaylight.controller.md.sal.dom.store.impl.DatastoreTestTask.WriteTransactionCustomizer; @@ -18,6 +18,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.top.level.list.NestedList; import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext; +import org.opendaylight.yangtools.util.concurrent.SpecialExecutors; import org.opendaylight.yangtools.yang.binding.YangModuleInfo; import org.opendaylight.yangtools.yang.binding.util.BindingReflections; import org.opendaylight.yangtools.yang.common.QName; @@ -48,6 +49,7 @@ public abstract class AbstractDataChangeListenerTest { private InMemoryDOMDataStore datastore; private SchemaContext schemaContext; + private TestDCLExecutorService dclExecutorService; @Before public final void setup() throws Exception { @@ -56,13 +58,24 @@ public abstract class AbstractDataChangeListenerTest { ModuleInfoBackedContext context = ModuleInfoBackedContext.create(); context.registerModuleInfo(moduleInfo); schemaContext = context.tryToCreateSchemaContext().get(); + + dclExecutorService = new TestDCLExecutorService( + SpecialExecutors.newBlockingBoundedFastThreadPool(1, 10, "DCL" )); + datastore = new InMemoryDOMDataStore("TEST", - MoreExecutors.sameThreadExecutor()); + MoreExecutors.sameThreadExecutor(), dclExecutorService ); datastore.onGlobalContextUpdated(schemaContext); } + @After + public void tearDown() { + if( dclExecutorService != null ) { + dclExecutorService.shutdownNow(); + } + } + public final DatastoreTestTask newTestTask() { - return new DatastoreTestTask(datastore).cleanup(DatastoreTestTask + return new DatastoreTestTask(datastore, dclExecutorService).cleanup(DatastoreTestTask .simpleDelete(TOP_LEVEL)); } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/DatastoreTestTask.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/DatastoreTestTask.java index 26987a6fba..98d79bee8b 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/DatastoreTestTask.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/DatastoreTestTask.java @@ -8,9 +8,11 @@ package org.opendaylight.controller.md.sal.dom.store.impl; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; @@ -37,11 +39,13 @@ public class DatastoreTestTask { private WriteTransactionCustomizer cleanup; private YangInstanceIdentifier changePath; private DataChangeScope changeScope; - private boolean postSetup = false; + private volatile boolean postSetup = false; private final ChangeEventListener internalListener; + private final TestDCLExecutorService dclExecutorService; - public DatastoreTestTask(final DOMStore datastore) { + public DatastoreTestTask(final DOMStore datastore, final TestDCLExecutorService dclExecutorService) { this.store = datastore; + this.dclExecutorService = dclExecutorService; internalListener = new ChangeEventListener(); } @@ -79,7 +83,7 @@ public class DatastoreTestTask { return this; } - public void run() throws InterruptedException, ExecutionException { + public void run() throws InterruptedException, ExecutionException, TimeoutException { if (setup != null) { execute(setup); } @@ -89,13 +93,17 @@ public class DatastoreTestTask { } Preconditions.checkState(write != null, "Write Transaction must be set."); + postSetup = true; + dclExecutorService.afterTestSetup(); + execute(write); if (registration != null) { registration.close(); } + if (changeListener != null) { - changeListener.onDataChanged(internalListener.receivedChange.get()); + changeListener.onDataChanged(getChangeEvent()); } if (read != null) { read.verify(store.newReadOnlyTransaction()); @@ -105,8 +113,26 @@ public class DatastoreTestTask { } } - public Future>> getChangeEvent() { - return internalListener.receivedChange; + public AsyncDataChangeEvent> getChangeEvent() { + try { + return internalListener.receivedChange.get(10, TimeUnit.SECONDS); + } catch( Exception e ) { + fail( "Error getting the AsyncDataChangeEvent from the Future: " + e ); + } + + // won't get here + return null; + } + + public void verifyNoChangeEvent() { + try { + Object unexpected = internalListener.receivedChange.get(500, TimeUnit.MILLISECONDS); + fail( "Got unexpected AsyncDataChangeEvent from the Future: " + unexpected ); + } catch( TimeoutException e ) { + // Expected + } catch( Exception e ) { + fail( "Error getting the AsyncDataChangeEvent from the Future: " + e ); + } } private void execute(final WriteTransactionCustomizer writeCustomizer) throws InterruptedException, diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/DefaultDataChangeListenerTestSuite.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/DefaultDataChangeListenerTestSuite.java index 54d2043dc7..84337de419 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/DefaultDataChangeListenerTestSuite.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/DefaultDataChangeListenerTestSuite.java @@ -20,7 +20,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha abstract protected void customizeTask(DatastoreTestTask task); @Test - public final void putTopLevelOneNested() throws InterruptedException, ExecutionException { + public final void putTopLevelOneNested() throws Exception { DatastoreTestTask task = newTestTask().test(writeOneTopMultipleNested(FOO, BAR)); customizeTask(task); @@ -29,7 +29,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha } @Test - public final void existingTopWriteSibling() throws InterruptedException, ExecutionException { + public final void existingTopWriteSibling() throws Exception { DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO)).test( new WriteTransactionCustomizer() { @Override @@ -46,7 +46,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha @Test - public final void existingTopWriteTwoNested() throws InterruptedException, ExecutionException { + public final void existingTopWriteTwoNested() throws Exception { DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO)).test( new WriteTransactionCustomizer() { @Override @@ -64,7 +64,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha @Test - public final void existingOneNestedWriteAdditionalNested() throws InterruptedException, ExecutionException { + public final void existingOneNestedWriteAdditionalNested() throws Exception { DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR)).test( new WriteTransactionCustomizer() { @Override @@ -79,11 +79,10 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha protected abstract void existingOneNestedWriteAdditionalNested(DatastoreTestTask task) throws InterruptedException, ExecutionException; - protected abstract void putTopLevelOneNested(DatastoreTestTask task) throws InterruptedException, - ExecutionException; + protected abstract void putTopLevelOneNested(DatastoreTestTask task) throws Exception; @Test - public final void replaceTopLevelNestedChanged() throws InterruptedException, ExecutionException { + public final void replaceTopLevelNestedChanged() throws Exception { DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR)).test( writeOneTopMultipleNested(FOO, BAZ)); customizeTask(task); @@ -95,7 +94,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha ExecutionException; @Test - public final void putTopLevelWithTwoNested() throws InterruptedException, ExecutionException { + public final void putTopLevelWithTwoNested() throws Exception { DatastoreTestTask task = newTestTask().test(writeOneTopMultipleNested(FOO, BAR, BAZ)); customizeTask(task); @@ -107,7 +106,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha ExecutionException; @Test - public final void twoNestedExistsOneIsDeleted() throws InterruptedException, ExecutionException { + public final void twoNestedExistsOneIsDeleted() throws Exception { DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR, BAZ)).test( deleteNested(FOO, BAZ)); @@ -120,7 +119,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha ExecutionException; @Test - public final void nestedListExistsRootDeleted() throws InterruptedException, ExecutionException { + public final void nestedListExistsRootDeleted() throws Exception { DatastoreTestTask task = newTestTask().cleanup(null).setup(writeOneTopMultipleNested(FOO, BAR, BAZ)) .test(DatastoreTestTask.simpleDelete(TOP_LEVEL)); diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java index 9b105aa306..4d38858667 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java @@ -47,7 +47,8 @@ public class InMemoryDataStoreTest { @Before public void setupStore() { - domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor()); + domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(), + MoreExecutors.sameThreadExecutor()); schemaContext = TestModel.createTestContext(); domStore.onGlobalContextUpdated(schemaContext); } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/RootScopeSubtreeTest.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/RootScopeSubtreeTest.java index 905dc0d19b..43b339e506 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/RootScopeSubtreeTest.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/RootScopeSubtreeTest.java @@ -23,7 +23,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite { @Override public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertContains(change.getCreatedData(), TOP_LEVEL, path(FOO), path(FOO, BAR)); assertEmpty(change.getUpdatedData()); @@ -34,7 +34,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite { public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertContains(change.getCreatedData(), path(FOO, BAZ)); assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO)); @@ -45,7 +45,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite { protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertContains(change.getCreatedData(), TOP_LEVEL, path(FOO), path(FOO, BAR), path(FOO, BAZ)); assertEmpty(change.getUpdatedData()); @@ -56,7 +56,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite { protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertEmpty(change.getCreatedData()); assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO)); @@ -67,7 +67,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite { protected void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertEmpty(change.getCreatedData()); assertEmpty(change.getUpdatedData()); @@ -76,7 +76,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite { @Override protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertContains(change.getCreatedData(), path(FOO,BAZ)); assertNotContains(change.getCreatedData(), path(FOO,BAR)); @@ -86,7 +86,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite { @Override protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertContains(change.getCreatedData(), path(FOO,BAR),path(FOO,BAZ)); assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO)); @@ -96,7 +96,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite { @Override protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertContains(change.getCreatedData(), path(FOO_SIBLING)); assertContains(change.getUpdatedData(), TOP_LEVEL); diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/SchemaUpdateForTransactionTest.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/SchemaUpdateForTransactionTest.java index 5cba93a712..364712c7b3 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/SchemaUpdateForTransactionTest.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/SchemaUpdateForTransactionTest.java @@ -34,7 +34,8 @@ public class SchemaUpdateForTransactionTest { @Before public void setupStore() { - domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor()); + domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(), + MoreExecutors.sameThreadExecutor()); loadSchemas(RockTheHouseInput.class); } diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestDCLExecutorService.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestDCLExecutorService.java new file mode 100644 index 0000000000..f6e6461bf5 --- /dev/null +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestDCLExecutorService.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.md.sal.dom.store.impl; + +import java.util.concurrent.ExecutorService; + +import com.google.common.util.concurrent.ForwardingExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +/** + * A forwarding Executor used by unit tests for DataChangeListener notifications + * + * @author Thomas Pantelis + */ +public class TestDCLExecutorService extends ForwardingExecutorService { + + // Start with a same thread executor to avoid timing issues during test setup. + private volatile ExecutorService currentExecutor = MoreExecutors.sameThreadExecutor(); + + // The real executor to use when test setup is complete. + private final ExecutorService postSetupExecutor; + + + public TestDCLExecutorService( ExecutorService postSetupExecutor ) { + this.postSetupExecutor = postSetupExecutor; + } + + @Override + protected ExecutorService delegate() { + return currentExecutor; + } + + public void afterTestSetup() { + // Test setup complete - switch to the real executor. + currentExecutor = postSetupExecutor; + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeBaseTest.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeBaseTest.java index 7c8676eff5..cdf465aace 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeBaseTest.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeBaseTest.java @@ -11,8 +11,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList; @@ -32,7 +30,7 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite @Override public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertNotNull(change); @@ -48,7 +46,7 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertNotNull(change); assertContains(change.getCreatedData(), path(FOO, BAZ)); @@ -62,7 +60,7 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertNotNull(change); assertFalse(change.getCreatedData().isEmpty()); @@ -77,7 +75,6 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - Future future = task.getChangeEvent(); /* * Base listener should be notified only and only if actual node changed its state, * since deletion of child, did not result in change of node we are listening @@ -85,14 +82,14 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite * and this means settable future containing receivedDataChangeEvent is not done. * */ - assertFalse(future.isDone()); + task.verifyNoChangeEvent(); } @Override public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertEmpty(change.getCreatedData()); assertEmpty(change.getUpdatedData()); @@ -103,7 +100,6 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite @Override protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) { - Future future = task.getChangeEvent(); /* * One listener should be notified only and only if actual node changed its state, * since deletion of nested child (in this case /nested-list/nested-list[foo], @@ -112,12 +108,11 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite * and this means settable future containing receivedDataChangeEvent is not done. * */ - assertFalse(future.isDone()); + task.verifyNoChangeEvent(); } @Override protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - Future future = task.getChangeEvent(); /* * One listener should be notified only and only if actual node changed its state, * since deletion of nested child (in this case /nested-list/nested-list[foo], @@ -126,12 +121,12 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite * and this means settable future containing receivedDataChangeEvent is not done. * */ - assertFalse(future.isDone()); + task.verifyNoChangeEvent(); } @Override protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertContains(change.getCreatedData(), path(FOO_SIBLING)); assertNotContains(change.getUpdatedData(), path(FOO), TOP_LEVEL); diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeOneTest.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeOneTest.java index ac18d5c976..3407e0ffa4 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeOneTest.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeOneTest.java @@ -11,8 +11,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList; @@ -32,7 +30,7 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite { @Override public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertNotNull(change); @@ -48,7 +46,7 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite { public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertNotNull(change); assertContains(change.getCreatedData(), path(FOO, BAZ)); @@ -62,7 +60,7 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite { protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertNotNull(change); assertFalse(change.getCreatedData().isEmpty()); @@ -77,7 +75,6 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite { protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - Future future = task.getChangeEvent(); /* * One listener should be notified only and only if actual node changed its state, * since deletion of nested child (in this case /nested-list/nested-list[foo], @@ -86,14 +83,14 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite { * and this means settable future containing receivedDataChangeEvent is not done. * */ - assertFalse(future.isDone()); + task.verifyNoChangeEvent(); } @Override public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertEmpty(change.getCreatedData()); assertEmpty(change.getUpdatedData()); @@ -104,7 +101,6 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite { @Override protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) { - Future future = task.getChangeEvent(); /* * One listener should be notified only and only if actual node changed its state, * since deletion of nested child (in this case /nested-list/nested-list[foo], @@ -113,12 +109,11 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite { * and this means settable future containing receivedDataChangeEvent is not done. * */ - assertFalse(future.isDone()); + task.verifyNoChangeEvent(); } @Override protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - Future future = task.getChangeEvent(); /* * One listener should be notified only and only if actual node changed its state, * since deletion of nested child (in this case /nested-list/nested-list[foo], @@ -127,12 +122,12 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite { * and this means settable future containing receivedDataChangeEvent is not done. * */ - assertFalse(future.isDone()); + task.verifyNoChangeEvent(); } @Override protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertContains(change.getCreatedData(), path(FOO_SIBLING)); assertNotContains(change.getUpdatedData(),path(FOO), TOP_LEVEL); diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeSubtreeTest.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeSubtreeTest.java index 7e67242dd3..a7fa24f293 100644 --- a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeSubtreeTest.java +++ b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeSubtreeTest.java @@ -32,7 +32,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui @Override public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertNotContains(change.getCreatedData(), TOP_LEVEL); assertContains(change.getCreatedData(), path(FOO), path(FOO, BAR)); @@ -45,7 +45,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertNotNull(change); assertContains(change.getCreatedData(), path(FOO, BAZ)); @@ -59,7 +59,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertNotNull(change); assertFalse(change.getCreatedData().isEmpty()); @@ -74,7 +74,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertNotNull(change); assertTrue(change.getCreatedData().isEmpty()); assertContains(change.getUpdatedData(), path(FOO)); @@ -86,7 +86,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertEmpty(change.getCreatedData()); assertEmpty(change.getUpdatedData()); @@ -97,7 +97,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui @Override protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertContains(change.getCreatedData(), path(FOO,BAZ)); assertNotContains(change.getCreatedData(), path(FOO,BAR)); @@ -108,7 +108,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui @Override protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertContains(change.getCreatedData(), path(FOO,BAR),path(FOO,BAZ)); assertContains(change.getUpdatedData(), path(FOO)); @@ -118,7 +118,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui @Override protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException { - AsyncDataChangeEvent> change = task.getChangeEvent().get(); + AsyncDataChangeEvent> change = task.getChangeEvent(); assertContains(change.getCreatedData(), path(FOO_SIBLING)); assertNotContains(change.getUpdatedData(), path(FOO), TOP_LEVEL); -- 2.36.6