Bug 1430: Off-load notifications from single commit thread 22/9422/12
authortpantelis <tpanteli@brocade.com>
Tue, 29 Jul 2014 10:42:14 +0000 (06:42 -0400)
committertpantelis <tpanteli@brocade.com>
Thu, 7 Aug 2014 05:08:41 +0000 (01:08 -0400)
Modified the InMemoryDOMDataStore to use the new
QueuedNotificationManager class added to yangtools common util for
DataChangeListener notifications.

Modified DOMDataCommitCoordinatorImpl's ListeningExecutorService to one
that off-loads ListenableFuture Runnable callbacks on a separate
executor.

Change-Id: I31f2fb002131c6d91b205d33255dd1bbc6433d9b
Signed-off-by: tpantelis <tpanteli@brocade.com>
29 files changed:
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/md/sal/binding/test/DataBrokerTestCustomizer.java
opendaylight/md-sal/sal-binding-broker/src/test/java/org/opendaylight/controller/sal/binding/test/util/BindingTestContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/AbstractModificationTest.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/dom/impl/DomInmemoryDataBrokerModule.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMDataCommitCoordinatorImpl.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerPerformanceTest.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMBrokerTest.java
opendaylight/md-sal/sal-dom-broker/src/test/java/org/opendaylight/controller/md/sal/dom/broker/impl/DOMTransactionChainTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/config/yang/inmemory_datastore_provider/InMemoryOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChangeListenerNotifyTask.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ResolveDataChangeEventsTask.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/AbstractDataChangeListenerTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/DatastoreTestTask.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/DefaultDataChangeListenerTestSuite.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/RootScopeSubtreeTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/SchemaUpdateForTransactionTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestDCLExecutorService.java [new file with mode: 0644]
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeBaseTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeOneTest.java
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/WildcardedScopeSubtreeTest.java

index e0f6f3546f528a2e72621447b8110dc9e377be71..666c819c82ce45ad55e8bf4f3c83c6ad7da349cd 100644 (file)
@@ -46,13 +46,15 @@ public class DataBrokerTestCustomizer {
     }
 
     public DOMStore createConfigurationDatastore() {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG",
+                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
         schemaService.registerSchemaContextListener(store);
         return store;
     }
 
     public DOMStore createOperationalDatastore() {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER",
+                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
         schemaService.registerSchemaContextListener(store);
         return store;
     }
index deb4a8aecacfbcffe47a2cee869c7d117c68f801..fef5715f50deea8ac6038b8684dc832c779cf582 100644 (file)
@@ -63,6 +63,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.MutableClassToInstanceMap;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 @Beta
 public class BindingTestContext implements AutoCloseable {
@@ -133,8 +134,10 @@ public class BindingTestContext implements AutoCloseable {
 
     public void startNewDomDataBroker() {
         checkState(executor != null, "Executor needs to be set");
-        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", executor);
-        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", executor);
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", executor,
+                MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", executor,
+                MoreExecutors.sameThreadExecutor());
         newDatastores = ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
                 .put(LogicalDatastoreType.OPERATIONAL, operStore)
                 .put(LogicalDatastoreType.CONFIGURATION, configStore)
index 479af79748033342041f32fe5221e68f78bf1c2f..c43307643b49539b6af395f8db7a5822725ee098 100644 (file)
@@ -25,6 +25,8 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.PropertyUtils;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -32,8 +34,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.Executors;
-
 /**
  *
  */
@@ -42,25 +42,34 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private static final Logger
         LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
-    private static final int DEFAULT_EXECUTOR_POOL_SIZE = 10;
+    private static final String EXECUTOR_MAX_POOL_SIZE_PROP =
+            "mdsal.dist-datastore-executor-pool.size";
+    private static final int DEFAULT_EXECUTOR_MAX_POOL_SIZE = 10;
+
+    private static final String EXECUTOR_MAX_QUEUE_SIZE_PROP =
+            "mdsal.dist-datastore-executor-queue.size";
+    private static final int DEFAULT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
 
     private final String type;
     private final ActorContext actorContext;
 
     private SchemaContext schemaContext;
 
-
-
     /**
      * Executor used to run FutureTask's
      *
      * This is typically used when we need to make a request to an actor and
      * wait for it's response and the consumer needs to be provided a Future.
-     *
-     * FIXME : Make the thread pool size configurable.
      */
     private final ListeningExecutorService executor =
-        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(DEFAULT_EXECUTOR_POOL_SIZE));
+            MoreExecutors.listeningDecorator(
+                    SpecialExecutors.newBlockingBoundedFastThreadPool(
+                            PropertyUtils.getIntSystemProperty(
+                                    EXECUTOR_MAX_POOL_SIZE_PROP,
+                                    DEFAULT_EXECUTOR_MAX_POOL_SIZE),
+                            PropertyUtils.getIntSystemProperty(
+                                    EXECUTOR_MAX_QUEUE_SIZE_PROP,
+                                    DEFAULT_EXECUTOR_MAX_QUEUE_SIZE), "DistDataStore"));
 
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster, Configuration configuration) {
         this(new ActorContext(actorSystem, actorSystem
index 10dbbc84d873ee54b5421643c00b046043e58114..8f058a34c2a6a47497cee03b806b21f102c4c478 100644 (file)
@@ -17,8 +17,6 @@ import akka.japi.Creator;
 import akka.serialization.Serialization;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -39,6 +37,7 @@ import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
@@ -52,7 +51,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -67,9 +65,6 @@ public class Shard extends RaftActor {
 
     public static final String DEFAULT_NAME = "default";
 
-    private final ListeningExecutorService storeExecutor =
-        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
-
     private final InMemoryDOMDataStore store;
 
     private final Map<Object, DOMStoreThreePhaseCommitCohort>
@@ -101,7 +96,7 @@ public class Shard extends RaftActor {
 
         LOG.info("Creating shard : {} persistent : {}", name, persistent);
 
-        store = new InMemoryDOMDataStore(name, storeExecutor);
+        store = InMemoryDOMDataStoreFactory.create(name, null);
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name);
 
index 920248521a297871f95c5312f1d34085900feced..eb2c24292aee6663a669e9b42ba29a82d966fbd6 100644 (file)
@@ -21,7 +21,8 @@ import static org.junit.Assert.assertEquals;
 public class DataChangeListenerRegistrationTest extends AbstractActorTest {
   private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
 
-  private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+  private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
+          MoreExecutors.sameThreadExecutor());
 
   static {
     store.onGlobalContextUpdated(TestModel.createTestContext());
@@ -37,12 +38,14 @@ public class DataChangeListenerRegistrationTest extends AbstractActorTest {
       final ActorRef subject = getSystem().actorOf(props, "testCloseListenerRegistration");
 
       new Within(duration("1 seconds")) {
+        @Override
         protected void run() {
 
           subject.tell(new CloseDataChangeListenerRegistration().toSerializable(), getRef());
 
           final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
             // do not put code outside this method, will run afterwards
+            @Override
             protected String match(Object in) {
               if (in.getClass().equals(CloseDataChangeListenerRegistrationReply.SERIALIZABLE_CLASS)) {
                 return "match";
index b35880a6a501367a4c1155b3cae4ef405352ddb6..d468af6664981d08ad603b1a841fefbdaccc8d47 100644 (file)
@@ -19,7 +19,8 @@ public class ShardTransactionChainTest extends AbstractActorTest {
 
   private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
 
-  private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor);
+  private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
+          MoreExecutors.sameThreadExecutor());
 
   static {
     store.onGlobalContextUpdated(TestModel.createTestContext());
@@ -31,12 +32,14 @@ public class ShardTransactionChainTest extends AbstractActorTest {
       final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
 
      new Within(duration("1 seconds")) {
+        @Override
         protected void run() {
 
           subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
 
           final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
             // do not put code outside this method, will run afterwards
+            @Override
             protected String match(Object in) {
               if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
                 return CreateTransactionReply.fromSerializable(in).getTransactionPath();
@@ -66,12 +69,14 @@ public class ShardTransactionChainTest extends AbstractActorTest {
       final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
 
       new Within(duration("1 seconds")) {
+        @Override
         protected void run() {
 
           subject.tell(new CloseTransactionChain().toSerializable(), getRef());
 
           final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
             // do not put code outside this method, will run afterwards
+            @Override
             protected String match(Object in) {
               if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
                 return "match";
index 632ecc29cd31b727f714be859154a53182ade178..6fe5154d555b31b65869d25d77289abf67258172 100644 (file)
@@ -42,7 +42,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
 
     private static final InMemoryDOMDataStore store =
-        new InMemoryDOMDataStore("OPER", storeExecutor);
+        new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
 
     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
 
@@ -59,6 +59,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef subject = getSystem().actorOf(props, "testReadData");
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(
@@ -67,6 +68,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
                               if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in)
@@ -99,6 +101,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(
@@ -107,6 +110,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
                                 if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in)
@@ -135,6 +139,7 @@ public class ShardTransactionTest extends AbstractActorTest {
         final Class<? extends Modification> modificationType) {
         new JavaTestKit(getSystem()) {{
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
                     subject
                         .tell(new ShardTransaction.GetCompositedModification(),
@@ -143,6 +148,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                     final CompositeModification compositeModification =
                         new ExpectMsg<CompositeModification>(duration("1 seconds"), "match hint") {
                             // do not put code outside this method, will run afterwards
+                            @Override
                             protected CompositeModification match(Object in) {
                                 if (in instanceof ShardTransaction.GetCompositeModificationReply) {
                                     return ((ShardTransaction.GetCompositeModificationReply) in)
@@ -174,6 +180,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 getSystem().actorOf(props, "testWriteData");
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new WriteData(TestModel.TEST_PATH,
@@ -182,6 +189,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
                                 return "match";
@@ -212,6 +220,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 getSystem().actorOf(props, "testMergeData");
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new MergeData(TestModel.TEST_PATH,
@@ -220,6 +229,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     final String out = new ExpectMsg<String>(duration("500 milliseconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) {
                                 return "match";
@@ -251,12 +261,14 @@ public class ShardTransactionTest extends AbstractActorTest {
                 getSystem().actorOf(props, "testDeleteData");
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
                                 return "match";
@@ -288,12 +300,14 @@ public class ShardTransactionTest extends AbstractActorTest {
                 getSystem().actorOf(props, "testReadyTransaction");
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new ReadyTransaction().toSerializable(), getRef());
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                                 return "match";
@@ -326,12 +340,14 @@ public class ShardTransactionTest extends AbstractActorTest {
             watch(subject);
 
             new Within(duration("2 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new CloseTransaction().toSerializable(), getRef());
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
                                 return "match";
@@ -345,6 +361,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     final String termination = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in instanceof Terminated) {
                                 return "match";
index d9c550a6db482d39855dc5a48ebb26b2fec8c6ad..84f3b92f1ba6dba08ae7a29992fe7d9e39b4e33c 100644 (file)
@@ -26,7 +26,8 @@ public abstract class AbstractModificationTest {
 
   @Before
   public void setUp(){
-    store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
+    store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor(),
+            MoreExecutors.sameThreadExecutor());
     store.onGlobalContextUpdated(TestModel.createTestContext());
   }
 
index 22dad6af23c1464b63f8a4aa25075551b64ef686..948f3c8d8b637b8dfb72fdd376fa7ee1f49aa3c3 100644 (file)
@@ -7,18 +7,18 @@
  */
 package org.opendaylight.controller.config.yang.md.sal.dom.impl;
 
-import java.util.concurrent.Executors;
-
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
 import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.opendaylight.yangtools.util.PropertyUtils;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 
 /**
 *
@@ -26,6 +26,17 @@ import com.google.common.util.concurrent.MoreExecutors;
 public final class DomInmemoryDataBrokerModule extends
         org.opendaylight.controller.config.yang.md.sal.dom.impl.AbstractDomInmemoryDataBrokerModule {
 
+    private static final String FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP =
+            "mdsal.datastore-future-callback-queue.size";
+    private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE = 1000;
+
+    private static final String FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP =
+            "mdsal.datastore-future-callback-pool.size";
+    private static final int DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE = 20;
+    private static final String COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP =
+            "mdsal.datastore-commit-queue.size";
+    private static final int DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE = 5000;
+
     public DomInmemoryDataBrokerModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
             final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
@@ -45,30 +56,55 @@ public final class DomInmemoryDataBrokerModule extends
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-        ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
         //Initializing Operational DOM DataStore defaulting to InMemoryDOMDataStore if one is not configured
         DOMStore operStore =  getOperationalDataStoreDependency();
         if(operStore == null){
            //we will default to InMemoryDOMDataStore creation
-          operStore = new InMemoryDOMDataStore("DOM-OPER", storeExecutor);
-          //here we will register the SchemaContext listener
-          getSchemaServiceDependency().registerSchemaContextListener((InMemoryDOMDataStore)operStore);
+          operStore = InMemoryDOMDataStoreFactory.create("DOM-OPER", getSchemaServiceDependency());
         }
 
         DOMStore configStore = getConfigDataStoreDependency();
         if(configStore == null){
            //we will default to InMemoryDOMDataStore creation
-           configStore = new InMemoryDOMDataStore("DOM-CFG", storeExecutor);
-          //here we will register the SchemaContext listener
-          getSchemaServiceDependency().registerSchemaContextListener((InMemoryDOMDataStore)configStore);
+           configStore = InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency());
         }
         ImmutableMap<LogicalDatastoreType, DOMStore> datastores = ImmutableMap
                 .<LogicalDatastoreType, DOMStore> builder().put(LogicalDatastoreType.OPERATIONAL, operStore)
                 .put(LogicalDatastoreType.CONFIGURATION, configStore).build();
 
+        /*
+         * We use a single-threaded executor for commits with a bounded queue capacity. If the
+         * queue capacity is reached, subsequent commit tasks will be rejected and the commits will
+         * fail. This is done to relieve back pressure. This should be an extreme scenario - either
+         * there's deadlock(s) somewhere and the controller is unstable or some rogue component is
+         * continuously hammering commits too fast or the controller is just over-capacity for the
+         * system it's running on.
+         */
+        ExecutorService commitExecutor = SpecialExecutors.newBoundedSingleThreadExecutor(
+                PropertyUtils.getIntSystemProperty(
+                        COMMIT_EXECUTOR_MAX_QUEUE_SIZE_PROP,
+                        DEFAULT_COMMIT_EXECUTOR_MAX_QUEUE_SIZE), "WriteTxCommit");
+
+        /*
+         * We use an executor for commit ListenableFuture callbacks that favors reusing available
+         * threads over creating new threads at the expense of execution time. The assumption is
+         * that most ListenableFuture callbacks won't execute a lot of business logic where we want
+         * it to run quicker - many callbacks will likely just handle error conditions and do
+         * nothing on success. The executor queue capacity is bounded and, if the capacity is
+         * reached, subsequent submitted tasks will block the caller.
+         */
+        Executor listenableFutureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(
+                PropertyUtils.getIntSystemProperty(
+                        FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE_PROP,
+                        DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_POOL_SIZE),
+                PropertyUtils.getIntSystemProperty(
+                        FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE_PROP,
+                        DEFAULT_FUTURE_CALLBACK_EXECUTOR_MAX_QUEUE_SIZE), "CommitFutures");
+
         DOMDataBrokerImpl newDataBroker = new DOMDataBrokerImpl(datastores,
-                new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
-                                              TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION));
+                new DeadlockDetectingListeningExecutorService(commitExecutor,
+                    TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION,
+                    listenableFutureExecutor));
 
         return newDataBroker;
     }
index 9a6d12fb1872fc0a96556b06133a48894e14a532..521e2d0e731af06ac972ce2cce28f75a347ba490 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.md.sal.dom.broker.impl;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.RejectedExecutionException;
 
 import javax.annotation.concurrent.GuardedBy;
 
@@ -86,8 +87,18 @@ public class DOMDataCommitCoordinatorImpl implements DOMDataCommitExecutor {
         Preconditions.checkArgument(cohorts != null, "Cohorts must not be null.");
         Preconditions.checkArgument(listener != null, "Listener must not be null");
         LOG.debug("Tx: {} is submitted for execution.", transaction.getIdentifier());
-        ListenableFuture<Void> commitFuture = executor.submit(new CommitCoordinationTask(
-                transaction, cohorts, listener));
+
+        ListenableFuture<Void> commitFuture = null;
+        try {
+            commitFuture = executor.submit(new CommitCoordinationTask(transaction, cohorts, listener));
+        } catch(RejectedExecutionException e) {
+            LOG.error("The commit executor's queue is full - submit task was rejected. \n" +
+                      executor, e);
+            return Futures.immediateFailedCheckedFuture(
+                    new TransactionCommitFailedException(
+                        "Could not submit the commit task - the commit queue capacity has been exceeded.", e));
+        }
+
         if (listener.isPresent()) {
             Futures.addCallback(commitFuture, new DOMDataCommitErrorInvoker(transaction, listener.get()));
         }
index 181396fc884699e07eef9e0778e1ce3655d5e721..e9ed5b1b303592c9f8b59d0a7a1145bebeb3e716 100644 (file)
@@ -63,8 +63,10 @@ public class DOMBrokerPerformanceTest {
 
     @Before
     public void setupStore() {
-       InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-       InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+                 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+                 MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
         schemaContext = TestModel.createTestContext();
 
         operStore.onGlobalContextUpdated(schemaContext);
index 0bb16a39b90f7eb513093b18faa20815061fad3c..e57d08f1737fde07dc455eabfc53c2e5304cd53f 100644 (file)
@@ -7,19 +7,24 @@ import static org.junit.Assert.assertEquals;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
 import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
 
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
@@ -28,6 +33,7 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
@@ -35,6 +41,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ForwardingExecutorService;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -46,11 +53,16 @@ public class DOMBrokerTest {
     private SchemaContext schemaContext;
     private DOMDataBrokerImpl domBroker;
     private ListeningExecutorService executor;
+    private ExecutorService futureExecutor;
+    private CommitExecutorService commitExecutor;
 
     @Before
     public void setupStore() {
-        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
         schemaContext = TestModel.createTestContext();
 
         operStore.onGlobalContextUpdated(schemaContext);
@@ -61,8 +73,10 @@ public class DOMBrokerTest {
                 .put(OPERATIONAL, operStore) //
                 .build();
 
-        executor = new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
-                                          TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION);
+        commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
+        futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
+        executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
+                TransactionCommitDeadlockException.DEADLOCK_EXECUTOR_FUNCTION, futureExecutor);
         domBroker = new DOMDataBrokerImpl(stores, executor);
     }
 
@@ -71,6 +85,10 @@ public class DOMBrokerTest {
         if( executor != null ) {
             executor.shutdownNow();
         }
+
+        if(futureExecutor != null) {
+            futureExecutor.shutdownNow();
+        }
     }
 
     @Test(timeout=10000)
@@ -137,6 +155,24 @@ public class DOMBrokerTest {
         assertTrue(afterCommitRead.isPresent());
     }
 
+    @Test(expected=TransactionCommitFailedException.class)
+    public void testRejectedCommit() throws Exception {
+
+        commitExecutor.delegate = Mockito.mock( ExecutorService.class );
+        Mockito.doThrow( new RejectedExecutionException( "mock" ) )
+            .when( commitExecutor.delegate ).execute( Mockito.any( Runnable.class ) );
+        Mockito.doNothing().when( commitExecutor.delegate ).shutdown();
+        Mockito.doReturn( Collections.emptyList() ).when( commitExecutor.delegate ).shutdownNow();
+        Mockito.doReturn( "" ).when( commitExecutor.delegate ).toString();
+        Mockito.doReturn( true ).when( commitExecutor.delegate )
+            .awaitTermination( Mockito.anyLong(), Mockito.any( TimeUnit.class ) );
+
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        writeTx.put( OPERATIONAL, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME) );
+
+        writeTx.submit().checkedGet( 5, TimeUnit.SECONDS );
+    }
+
     /**
      * Tests a simple DataChangeListener notification after a write.
      */
@@ -306,4 +342,18 @@ public class DOMBrokerTest {
             assertTrue( "onDataChanged was not called", latch.await( 5, TimeUnit.SECONDS ) );
         }
     }
+
+    static class CommitExecutorService extends ForwardingExecutorService {
+
+        ExecutorService delegate;
+
+        public CommitExecutorService( ExecutorService delegate ) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        protected ExecutorService delegate() {
+            return delegate;
+        }
+    }
 }
index 3ea0bcefa5bab97ea12a9ead64e40cce49b78277..18b11c8300ab37a526a2018c1285a9a9817f7d3a 100644 (file)
@@ -44,8 +44,10 @@ public class DOMTransactionChainTest {
 
     @Before
     public void setupStore() {
-        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+                MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
         schemaContext = TestModel.createTestContext();
 
         operStore.onGlobalContextUpdated(schemaContext);
index 805608d479e0e76d1ecebb6a7398a884e11498cb..39a448ff6c861ac4db998bf0d17f4ec4d8e22d6d 100644 (file)
@@ -1,12 +1,9 @@
 package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
 
-import java.util.concurrent.Executors;
-
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-
-import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 
 public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryConfigDataStoreProviderModule {
+
     public InMemoryConfigDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
@@ -22,9 +19,7 @@ public class InMemoryConfigDataStoreProviderModule extends org.opendaylight.cont
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-      InMemoryDOMDataStore   ids = new InMemoryDOMDataStore("DOM-CFG", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
-      getSchemaServiceDependency().registerSchemaContextListener(ids);
-      return ids;
+        return InMemoryDOMDataStoreFactory.create("DOM-CFG", getSchemaServiceDependency());
     }
 
 }
index f4795588ab61ef62a9f74f6eb1530da1b6307141..615fe0211c0cbba8c1bc5c1a5687dd1ccea8dc33 100644 (file)
@@ -1,12 +1,9 @@
 package org.opendaylight.controller.config.yang.inmemory_datastore_provider;
 
-import java.util.concurrent.Executors;
-
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-
-import com.google.common.util.concurrent.MoreExecutors;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 
 public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight.controller.config.yang.inmemory_datastore_provider.AbstractInMemoryOperationalDataStoreProviderModule {
+
     public InMemoryOperationalDataStoreProviderModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier, final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
         super(identifier, dependencyResolver);
     }
@@ -22,9 +19,7 @@ public class InMemoryOperationalDataStoreProviderModule extends org.opendaylight
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-      InMemoryDOMDataStore ids = new InMemoryDOMDataStore("DOM-OPER", MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()));
-      getOperationalSchemaServiceDependency().registerSchemaContextListener(ids);
-      return ids;
+        return InMemoryDOMDataStoreFactory.create("DOM-OPER", getOperationalSchemaServiceDependency());
     }
 
 }
index 27325d84a9ddf3ecf11133ab2d55137477b3fe25..ac1f2e32d531dca0074290673d9399bb90074fd9 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
@@ -16,31 +18,33 @@ import org.slf4j.LoggerFactory;
 class ChangeListenerNotifyTask implements Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(ChangeListenerNotifyTask.class);
+
     private final Iterable<? extends DataChangeListenerRegistration<?>> listeners;
     private final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
 
+    @SuppressWarnings("rawtypes")
+    private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
+                                                                            notificationMgr;
+
+    @SuppressWarnings("rawtypes")
     public ChangeListenerNotifyTask(final Iterable<? extends DataChangeListenerRegistration<?>> listeners,
-            final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event) {
+            final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event,
+            final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent> notificationMgr) {
         this.listeners = listeners;
         this.event = event;
+        this.notificationMgr = notificationMgr;
     }
 
     @Override
     public void run() {
 
         for (DataChangeListenerRegistration<?> listener : listeners) {
-            try {
-                listener.getInstance().onDataChanged(event);
-            } catch (Exception e) {
-                LOG.error("Unhandled exception during invoking listener {} with event {}", listener, event, e);
-            }
+            notificationMgr.submitNotification(listener.getInstance(), event);
         }
-
     }
 
     @Override
     public String toString() {
         return "ChangeListenerNotifyTask [listeners=" + listeners + ", event=" + event + "]";
     }
-
 }
index c44d0909d688773d0cc37034cae21541f0823e6f..b61b3671034601fc09d7658280f986d4d30cc3ce 100644 (file)
@@ -13,11 +13,17 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.util.ExecutorServiceUtil;
+import org.opendaylight.yangtools.util.PropertyUtils;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -43,8 +49,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
+
 import java.util.Collections;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -61,16 +70,51 @@ import static com.google.common.base.Preconditions.checkState;
 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
         TransactionReadyPrototype,AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
+
+    @SuppressWarnings("rawtypes")
+    private static final QueuedNotificationManager.Invoker<AsyncDataChangeListener,
+                                       AsyncDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
+            new QueuedNotificationManager.Invoker<AsyncDataChangeListener,
+                                                  AsyncDataChangeEvent>() {
+
+                @SuppressWarnings("unchecked")
+                @Override
+                public void invokeListener( AsyncDataChangeListener listener,
+                                            AsyncDataChangeEvent notification ) {
+                    listener.onDataChanged(notification);
+                }
+            };
+
+    private static final String DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP =
+            "mdsal.datastore-dcl-notification-queue.size";
+
+    private static final int DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE = 1000;
+
     private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
     private final ListenerTree listenerTree = ListenerTree.create();
     private final AtomicLong txCounter = new AtomicLong(0);
-    private final ListeningExecutorService executor;
+    private final ListeningExecutorService listeningExecutor;
+
+    @SuppressWarnings("rawtypes")
+    private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
+                                                              dataChangeListenerNotificationManager;
+    private final ExecutorService dataChangeListenerExecutor;
 
     private final String name;
 
-    public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
+    public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
+            final ExecutorService dataChangeListenerExecutor) {
         this.name = Preconditions.checkNotNull(name);
-        this.executor = Preconditions.checkNotNull(executor);
+        this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor);
+
+        this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
+
+        int maxDCLQueueSize = PropertyUtils.getIntSystemProperty(
+                DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE );
+
+        dataChangeListenerNotificationManager =
+                new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
+                        DCL_NOTIFICATION_MGR_INVOKER, maxDCLQueueSize, "DataChangeListenerQueueMgr");
     }
 
     @Override
@@ -104,8 +148,9 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
     }
 
     @Override
-    public void close(){
-        executor.shutdownNow();
+    public void close() {
+        ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
+        ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
     }
     @Override
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
@@ -132,7 +177,9 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
                         .setAfter(data) //
                         .addCreated(path, data) //
                         .build();
-                executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
+
+                new ChangeListenerNotifyTask(Collections.singletonList(reg), event,
+                        dataChangeListenerNotificationManager).run();
             }
         }
 
@@ -221,8 +268,9 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         @Override
         public void close() {
 
-             executor.shutdownNow();
-
+            // FIXME: this call doesn't look right here - listeningExecutor is shared and owned
+            // by the outer class.
+            //listeningExecutor.shutdownNow();
         }
 
         protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
@@ -308,7 +356,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Boolean> canCommit() {
-            return executor.submit(new Callable<Boolean>() {
+            return listeningExecutor.submit(new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws TransactionCommitFailedException {
                     try {
@@ -330,11 +378,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Void> preCommit() {
-            return executor.submit(new Callable<Void>() {
+            return listeningExecutor.submit(new Callable<Void>() {
                 @Override
                 public Void call() {
                     candidate = dataTree.prepare(modification);
-                    listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
+                    listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree,
+                            dataChangeListenerNotificationManager);
                     return null;
                 }
             });
@@ -359,7 +408,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
                 for (ChangeListenerNotifyTask task : listenerResolver.call()) {
                     LOG.trace("Scheduling invocation of listeners: {}", task);
-                    executor.submit(task);
+                    task.run();
                 }
             }
 
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStoreFactory.java
new file mode 100644 (file)
index 0000000..c853a13
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.annotation.Nullable;
+
+import org.opendaylight.controller.sal.core.api.model.SchemaService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.opendaylight.yangtools.util.PropertyUtils;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * A factory for creating InMemoryDOMDataStore instances.
+ *
+ * @author Thomas Pantelis
+ */
+public final class InMemoryDOMDataStoreFactory {
+
+    private static final String DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP =
+            "mdsal.datastore-dcl-notification-queue.size";
+    private static final int DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE = 1000;
+
+    private static final String DCL_EXECUTOR_MAX_POOL_SIZE_PROP =
+            "mdsal.datastore-dcl-notification-pool.size";
+    private static final int DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE = 20;
+
+    private InMemoryDOMDataStoreFactory() {
+    }
+
+    /**
+     * Creates an InMemoryDOMDataStore instance.
+     *
+     * @param name the name of the data store
+     * @param schemaService the SchemaService to which to register the data store.
+     * @return an InMemoryDOMDataStore instance
+     */
+    public static InMemoryDOMDataStore create(final String name,
+            @Nullable final SchemaService schemaService) {
+
+        // For DataChangeListener notifications we use an executor that provides the fastest
+        // task execution time to get higher throughput as DataChangeListeners typically provide
+        // much of the business logic for a data model. If the executor queue size limit is reached,
+        // subsequent submitted notifications will block the calling thread.
+
+        int dclExecutorMaxQueueSize = PropertyUtils.getIntSystemProperty(
+                DCL_EXECUTOR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_QUEUE_SIZE);
+        int dclExecutorMaxPoolSize = PropertyUtils.getIntSystemProperty(
+                DCL_EXECUTOR_MAX_POOL_SIZE_PROP, DEFAULT_DCL_EXECUTOR_MAX_POOL_SIZE);
+
+        ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
+                dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
+
+        InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
+                MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()),
+                dataChangeListenerExecutor);
+
+        if(schemaService != null) {
+            schemaService.registerSchemaContextListener(dataStore);
+        }
+
+        return dataStore;
+    }
+}
index 3ddf0b60faf07323f7f26cb4d7488851015ef688..d8feaa71f6ac104132f14c0659677f566ee530c5 100644 (file)
@@ -24,12 +24,15 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.Builder;
 import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent.SimpleEventFactory;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Node;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree.Walker;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
@@ -57,9 +60,15 @@ final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListe
     private final DataTreeCandidate candidate;
     private final ListenerTree listenerRoot;
 
-    public ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree) {
+    @SuppressWarnings("rawtypes")
+    private final NotificationManager<AsyncDataChangeListener, AsyncDataChangeEvent> notificationMgr;
+
+    @SuppressWarnings("rawtypes")
+    public ResolveDataChangeEventsTask(final DataTreeCandidate candidate, final ListenerTree listenerTree,
+            final NotificationManager<AsyncDataChangeListener, AsyncDataChangeEvent> notificationMgr) {
         this.candidate = Preconditions.checkNotNull(candidate);
         this.listenerRoot = Preconditions.checkNotNull(listenerTree);
+        this.notificationMgr = Preconditions.checkNotNull(notificationMgr);
     }
 
     /**
@@ -120,7 +129,7 @@ final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListe
      * @param listeners
      * @param entries
      */
-    private static void addNotificationTask(final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder,
+    private void addNotificationTask(final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder,
             final ListenerTree.Node listeners, final Collection<DOMImmutableDataChangeEvent> entries) {
 
         if (!entries.isEmpty()) {
@@ -141,7 +150,7 @@ final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListe
      * @param listeners
      * @param event
      */
-    private static void addNotificationTaskByScope(
+    private void addNotificationTaskByScope(
             final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final ListenerTree.Node listeners,
             final DOMImmutableDataChangeEvent event) {
         DataChangeScope eventScope = event.getScope();
@@ -150,11 +159,11 @@ final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListe
             List<DataChangeListenerRegistration<?>> listenerSet = Collections
                     .<DataChangeListenerRegistration<?>> singletonList(listenerReg);
             if (eventScope == DataChangeScope.BASE) {
-                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
             } else if (eventScope == DataChangeScope.ONE && listenerScope != DataChangeScope.BASE) {
-                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
             } else if (eventScope == DataChangeScope.SUBTREE && listenerScope == DataChangeScope.SUBTREE) {
-                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
             }
         }
     }
@@ -172,7 +181,7 @@ final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListe
      * @param listeners
      * @param entries
      */
-    private static void addNotificationTasksAndMergeEvents(
+    private void addNotificationTasksAndMergeEvents(
             final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final ListenerTree.Node listeners,
             final Collection<DOMImmutableDataChangeEvent> entries) {
 
@@ -210,14 +219,14 @@ final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListe
         }
     }
 
-    private static void addNotificationTaskExclusively(
+    private void addNotificationTaskExclusively(
             final ImmutableList.Builder<ChangeListenerNotifyTask> taskListBuilder, final Node listeners,
             final DOMImmutableDataChangeEvent event) {
         for (DataChangeListenerRegistration<?> listener : listeners.getListeners()) {
             if (listener.getScope() == event.getScope()) {
                 Set<DataChangeListenerRegistration<?>> listenerSet = Collections
                         .<DataChangeListenerRegistration<?>> singleton(listener);
-                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event));
+                taskListBuilder.add(new ChangeListenerNotifyTask(listenerSet, event, notificationMgr));
             }
         }
     }
@@ -519,7 +528,10 @@ final class ResolveDataChangeEventsTask implements Callable<Iterable<ChangeListe
         }
     }
 
-    public static ResolveDataChangeEventsTask create(final DataTreeCandidate candidate, final ListenerTree listenerTree) {
-        return new ResolveDataChangeEventsTask(candidate, listenerTree);
+    @SuppressWarnings("rawtypes")
+    public static ResolveDataChangeEventsTask create(final DataTreeCandidate candidate,
+            final ListenerTree listenerTree,
+            final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent> notificationMgr) {
+        return new ResolveDataChangeEventsTask(candidate, listenerTree, notificationMgr);
     }
 }
index 3176ca764de198dac326dbfb9db51f6f2190ded8..76a9354d1aea79cde305b087a830687b75d91889 100644 (file)
@@ -9,7 +9,7 @@ package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import java.util.Collection;
 import java.util.Map;
-
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.opendaylight.controller.md.sal.dom.store.impl.DatastoreTestTask.WriteTransactionCustomizer;
@@ -18,6 +18,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.top.level.list.NestedList;
 import org.opendaylight.yangtools.sal.binding.generator.impl.ModuleInfoBackedContext;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
 import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
 import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -48,6 +49,7 @@ public abstract class AbstractDataChangeListenerTest {
 
     private InMemoryDOMDataStore datastore;
     private SchemaContext schemaContext;
+    private TestDCLExecutorService dclExecutorService;
 
     @Before
     public final void setup() throws Exception {
@@ -56,13 +58,24 @@ public abstract class AbstractDataChangeListenerTest {
         ModuleInfoBackedContext context = ModuleInfoBackedContext.create();
         context.registerModuleInfo(moduleInfo);
         schemaContext = context.tryToCreateSchemaContext().get();
+
+        dclExecutorService = new TestDCLExecutorService(
+                SpecialExecutors.newBlockingBoundedFastThreadPool(1, 10, "DCL" ));
+
         datastore = new InMemoryDOMDataStore("TEST",
-                MoreExecutors.sameThreadExecutor());
+                MoreExecutors.sameThreadExecutor(), dclExecutorService );
         datastore.onGlobalContextUpdated(schemaContext);
     }
 
+    @After
+    public void tearDown() {
+        if( dclExecutorService != null ) {
+            dclExecutorService.shutdownNow();
+        }
+    }
+
     public final DatastoreTestTask newTestTask() {
-        return new DatastoreTestTask(datastore).cleanup(DatastoreTestTask
+        return new DatastoreTestTask(datastore, dclExecutorService).cleanup(DatastoreTestTask
                 .simpleDelete(TOP_LEVEL));
     }
 
index 26987a6fba6426169ee7df98340c8ac0f0f4be47..98d79bee8bffaf40adacec43c95a665c5300b495 100644 (file)
@@ -8,9 +8,11 @@
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
@@ -37,11 +39,13 @@ public class DatastoreTestTask {
     private WriteTransactionCustomizer cleanup;
     private YangInstanceIdentifier changePath;
     private DataChangeScope changeScope;
-    private boolean postSetup = false;
+    private volatile boolean postSetup = false;
     private final ChangeEventListener internalListener;
+    private final TestDCLExecutorService dclExecutorService;
 
-    public DatastoreTestTask(final DOMStore datastore) {
+    public DatastoreTestTask(final DOMStore datastore, final TestDCLExecutorService dclExecutorService) {
         this.store = datastore;
+        this.dclExecutorService = dclExecutorService;
         internalListener = new ChangeEventListener();
     }
 
@@ -79,7 +83,7 @@ public class DatastoreTestTask {
         return this;
     }
 
-    public void run() throws InterruptedException, ExecutionException {
+    public void run() throws InterruptedException, ExecutionException, TimeoutException {
         if (setup != null) {
             execute(setup);
         }
@@ -89,13 +93,17 @@ public class DatastoreTestTask {
         }
 
         Preconditions.checkState(write != null, "Write Transaction must be set.");
+
         postSetup = true;
+        dclExecutorService.afterTestSetup();
+
         execute(write);
         if (registration != null) {
             registration.close();
         }
+
         if (changeListener != null) {
-            changeListener.onDataChanged(internalListener.receivedChange.get());
+            changeListener.onDataChanged(getChangeEvent());
         }
         if (read != null) {
             read.verify(store.newReadOnlyTransaction());
@@ -105,8 +113,26 @@ public class DatastoreTestTask {
         }
     }
 
-    public Future<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>> getChangeEvent() {
-        return internalListener.receivedChange;
+    public AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> getChangeEvent() {
+        try {
+            return internalListener.receivedChange.get(10, TimeUnit.SECONDS);
+        } catch( Exception e ) {
+            fail( "Error getting the AsyncDataChangeEvent from the Future: " + e );
+        }
+
+        // won't get here
+        return null;
+    }
+
+    public void verifyNoChangeEvent() {
+        try {
+            Object unexpected = internalListener.receivedChange.get(500, TimeUnit.MILLISECONDS);
+            fail( "Got unexpected AsyncDataChangeEvent from the Future: " + unexpected );
+        } catch( TimeoutException e ) {
+            // Expected
+        } catch( Exception e ) {
+            fail( "Error getting the AsyncDataChangeEvent from the Future: " + e );
+        }
     }
 
     private void execute(final WriteTransactionCustomizer writeCustomizer) throws InterruptedException,
index 54d2043dc76a6df85cafdfe468f1bf7ff5b989f3..84337de419b2d24f82425fb889b89786e83027ce 100644 (file)
@@ -20,7 +20,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha
     abstract protected void customizeTask(DatastoreTestTask task);
 
     @Test
-    public final void putTopLevelOneNested() throws InterruptedException, ExecutionException {
+    public final void putTopLevelOneNested() throws Exception {
 
         DatastoreTestTask task = newTestTask().test(writeOneTopMultipleNested(FOO, BAR));
         customizeTask(task);
@@ -29,7 +29,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha
     }
 
     @Test
-    public final void existingTopWriteSibling() throws InterruptedException, ExecutionException {
+    public final void existingTopWriteSibling() throws Exception {
         DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO)).test(
                 new WriteTransactionCustomizer() {
                     @Override
@@ -46,7 +46,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha
 
 
     @Test
-    public final void existingTopWriteTwoNested() throws InterruptedException, ExecutionException {
+    public final void existingTopWriteTwoNested() throws Exception {
         DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO)).test(
                 new WriteTransactionCustomizer() {
                     @Override
@@ -64,7 +64,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha
 
 
     @Test
-    public final void existingOneNestedWriteAdditionalNested() throws InterruptedException, ExecutionException {
+    public final void existingOneNestedWriteAdditionalNested() throws Exception {
         DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR)).test(
                 new WriteTransactionCustomizer() {
                     @Override
@@ -79,11 +79,10 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha
 
     protected abstract void existingOneNestedWriteAdditionalNested(DatastoreTestTask task) throws InterruptedException, ExecutionException;
 
-    protected abstract void putTopLevelOneNested(DatastoreTestTask task) throws InterruptedException,
-            ExecutionException;
+    protected abstract void putTopLevelOneNested(DatastoreTestTask task) throws Exception;
 
     @Test
-    public final void replaceTopLevelNestedChanged() throws InterruptedException, ExecutionException {
+    public final void replaceTopLevelNestedChanged() throws Exception {
         DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR)).test(
                 writeOneTopMultipleNested(FOO, BAZ));
         customizeTask(task);
@@ -95,7 +94,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha
             ExecutionException;
 
     @Test
-    public final void putTopLevelWithTwoNested() throws InterruptedException, ExecutionException {
+    public final void putTopLevelWithTwoNested() throws Exception {
 
         DatastoreTestTask task = newTestTask().test(writeOneTopMultipleNested(FOO, BAR, BAZ));
         customizeTask(task);
@@ -107,7 +106,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha
             ExecutionException;
 
     @Test
-    public final void twoNestedExistsOneIsDeleted() throws InterruptedException, ExecutionException {
+    public final void twoNestedExistsOneIsDeleted() throws Exception {
 
         DatastoreTestTask task = newTestTask().setup(writeOneTopMultipleNested(FOO, BAR, BAZ)).test(
                 deleteNested(FOO, BAZ));
@@ -120,7 +119,7 @@ public abstract class DefaultDataChangeListenerTestSuite extends AbstractDataCha
             ExecutionException;
 
     @Test
-    public final void nestedListExistsRootDeleted() throws InterruptedException, ExecutionException {
+    public final void nestedListExistsRootDeleted() throws Exception {
 
         DatastoreTestTask task = newTestTask().cleanup(null).setup(writeOneTopMultipleNested(FOO, BAR, BAZ))
                 .test(DatastoreTestTask.simpleDelete(TOP_LEVEL));
index 9b105aa3064121b518295220a5fb3351d030c13b..4d38858667173df23ed112fa5e13c139914bdb73 100644 (file)
@@ -47,7 +47,8 @@ public class InMemoryDataStoreTest {
 
     @Before
     public void setupStore() {
-        domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
+        domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
+                MoreExecutors.sameThreadExecutor());
         schemaContext = TestModel.createTestContext();
         domStore.onGlobalContextUpdated(schemaContext);
     }
index 905dc0d19b8c1f3b671de983b4dbda5de89526ce..43b339e506d48670b10b896183d11db8d3c83a2f 100644 (file)
@@ -23,7 +23,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite {
 
     @Override
     public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertContains(change.getCreatedData(), TOP_LEVEL, path(FOO), path(FOO, BAR));
         assertEmpty(change.getUpdatedData());
@@ -34,7 +34,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite {
     public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertContains(change.getCreatedData(), path(FOO, BAZ));
         assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO));
@@ -45,7 +45,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite {
     protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertContains(change.getCreatedData(), TOP_LEVEL, path(FOO), path(FOO, BAR), path(FOO, BAZ));
         assertEmpty(change.getUpdatedData());
@@ -56,7 +56,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite {
     protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertEmpty(change.getCreatedData());
         assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO));
@@ -67,7 +67,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite {
     protected void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertEmpty(change.getCreatedData());
         assertEmpty(change.getUpdatedData());
@@ -76,7 +76,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite {
 
     @Override
     protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertContains(change.getCreatedData(), path(FOO,BAZ));
         assertNotContains(change.getCreatedData(), path(FOO,BAR));
@@ -86,7 +86,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite {
 
     @Override
     protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertContains(change.getCreatedData(), path(FOO,BAR),path(FOO,BAZ));
         assertContains(change.getUpdatedData(), TOP_LEVEL, path(FOO));
@@ -96,7 +96,7 @@ public class RootScopeSubtreeTest extends DefaultDataChangeListenerTestSuite {
 
     @Override
     protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertContains(change.getCreatedData(), path(FOO_SIBLING));
         assertContains(change.getUpdatedData(), TOP_LEVEL);
index 5cba93a712f6b313d0a40bd9ece03ec88135d567..364712c7b393ba87ee1d4900e9b3fab9fed41185 100644 (file)
@@ -34,7 +34,8 @@ public class SchemaUpdateForTransactionTest {
 
     @Before
     public void setupStore() {
-        domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
+        domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
+                MoreExecutors.sameThreadExecutor());
         loadSchemas(RockTheHouseInput.class);
     }
 
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestDCLExecutorService.java b/opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/TestDCLExecutorService.java
new file mode 100644 (file)
index 0000000..f6e6461
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import java.util.concurrent.ExecutorService;
+
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * A forwarding Executor used by unit tests for DataChangeListener notifications
+ *
+ * @author Thomas Pantelis
+ */
+public class TestDCLExecutorService extends ForwardingExecutorService {
+
+    // Start with a same thread executor to avoid timing issues during test setup.
+    private volatile ExecutorService currentExecutor = MoreExecutors.sameThreadExecutor();
+
+    // The real executor to use when test setup is complete.
+    private final ExecutorService postSetupExecutor;
+
+
+    public TestDCLExecutorService( ExecutorService postSetupExecutor ) {
+        this.postSetupExecutor = postSetupExecutor;
+    }
+
+    @Override
+    protected ExecutorService delegate() {
+        return currentExecutor;
+    }
+
+    public void afterTestSetup() {
+        // Test setup complete - switch to the real executor.
+        currentExecutor = postSetupExecutor;
+    }
+}
\ No newline at end of file
index 7c8676eff56728f9b5307e177f9ffcbd0c4bb87f..cdf465aacee9da2226d6a2f3b9721b6f8770306e 100644 (file)
@@ -11,8 +11,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
@@ -32,7 +30,7 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite
     @Override
     public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertNotNull(change);
 
@@ -48,7 +46,7 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite
     public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
         assertNotNull(change);
 
         assertContains(change.getCreatedData(), path(FOO, BAZ));
@@ -62,7 +60,7 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite
     protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
         assertNotNull(change);
         assertFalse(change.getCreatedData().isEmpty());
 
@@ -77,7 +75,6 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite
     protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        Future<?> future = task.getChangeEvent();
         /*
          * Base listener should be notified only and only if actual node changed its state,
          * since deletion of child, did not result in change of node we are listening
@@ -85,14 +82,14 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite
          * and this means settable future containing receivedDataChangeEvent is not done.
          *
          */
-        assertFalse(future.isDone());
+        task.verifyNoChangeEvent();
     }
 
     @Override
     public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertEmpty(change.getCreatedData());
         assertEmpty(change.getUpdatedData());
@@ -103,7 +100,6 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite
 
     @Override
     protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) {
-        Future<?> future = task.getChangeEvent();
         /*
          * One listener should be notified only and only if actual node changed its state,
          * since deletion of nested child (in this case /nested-list/nested-list[foo],
@@ -112,12 +108,11 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite
          * and this means settable future containing receivedDataChangeEvent is not done.
          *
          */
-        assertFalse(future.isDone());
+        task.verifyNoChangeEvent();
     }
 
     @Override
     protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
-        Future<?> future = task.getChangeEvent();
         /*
          * One listener should be notified only and only if actual node changed its state,
          * since deletion of nested child (in this case /nested-list/nested-list[foo],
@@ -126,12 +121,12 @@ public class WildcardedScopeBaseTest extends DefaultDataChangeListenerTestSuite
          * and this means settable future containing receivedDataChangeEvent is not done.
          *
          */
-        assertFalse(future.isDone());
+        task.verifyNoChangeEvent();
     }
 
     @Override
     protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertContains(change.getCreatedData(), path(FOO_SIBLING));
         assertNotContains(change.getUpdatedData(), path(FOO), TOP_LEVEL);
index ac18d5c976d2ae0e9691d52eeac14fa7aef8da91..3407e0ffa4c6511a55b7d11c5bdd4ca9a0a718f2 100644 (file)
@@ -11,8 +11,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
@@ -32,7 +30,7 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite {
     @Override
     public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertNotNull(change);
 
@@ -48,7 +46,7 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite {
     public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
         assertNotNull(change);
 
         assertContains(change.getCreatedData(), path(FOO, BAZ));
@@ -62,7 +60,7 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite {
     protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
         assertNotNull(change);
         assertFalse(change.getCreatedData().isEmpty());
 
@@ -77,7 +75,6 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite {
     protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        Future<?> future = task.getChangeEvent();
         /*
          * One listener should be notified only and only if actual node changed its state,
          * since deletion of nested child (in this case /nested-list/nested-list[foo],
@@ -86,14 +83,14 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite {
          * and this means settable future containing receivedDataChangeEvent is not done.
          *
          */
-        assertFalse(future.isDone());
+        task.verifyNoChangeEvent();
     }
 
     @Override
     public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertEmpty(change.getCreatedData());
         assertEmpty(change.getUpdatedData());
@@ -104,7 +101,6 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite {
 
     @Override
     protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) {
-        Future<?> future = task.getChangeEvent();
         /*
          * One listener should be notified only and only if actual node changed its state,
          * since deletion of nested child (in this case /nested-list/nested-list[foo],
@@ -113,12 +109,11 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite {
          * and this means settable future containing receivedDataChangeEvent is not done.
          *
          */
-        assertFalse(future.isDone());
+        task.verifyNoChangeEvent();
     }
 
     @Override
     protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
-        Future<?> future = task.getChangeEvent();
         /*
          * One listener should be notified only and only if actual node changed its state,
          * since deletion of nested child (in this case /nested-list/nested-list[foo],
@@ -127,12 +122,12 @@ public class WildcardedScopeOneTest extends DefaultDataChangeListenerTestSuite {
          * and this means settable future containing receivedDataChangeEvent is not done.
          *
          */
-        assertFalse(future.isDone());
+        task.verifyNoChangeEvent();
     }
 
     @Override
     protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertContains(change.getCreatedData(), path(FOO_SIBLING));
         assertNotContains(change.getUpdatedData(),path(FOO), TOP_LEVEL);
index 7e67242dd3f941490d54f53bcb96d5987161d01f..a7fa24f2934a8da5dc879236147f6fd35df14cba 100644 (file)
@@ -32,7 +32,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui
     @Override
     public void putTopLevelOneNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertNotContains(change.getCreatedData(), TOP_LEVEL);
         assertContains(change.getCreatedData(), path(FOO), path(FOO, BAR));
@@ -45,7 +45,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui
     public void replaceTopLevelNestedChanged(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
         assertNotNull(change);
 
         assertContains(change.getCreatedData(), path(FOO, BAZ));
@@ -59,7 +59,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui
     protected void putTopLevelWithTwoNested(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
         assertNotNull(change);
         assertFalse(change.getCreatedData().isEmpty());
 
@@ -74,7 +74,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui
     protected void twoNestedExistsOneIsDeleted(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
         assertNotNull(change);
         assertTrue(change.getCreatedData().isEmpty());
         assertContains(change.getUpdatedData(), path(FOO));
@@ -86,7 +86,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui
     public void nestedListExistsRootDeleted(final DatastoreTestTask task) throws InterruptedException,
             ExecutionException {
 
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertEmpty(change.getCreatedData());
         assertEmpty(change.getUpdatedData());
@@ -97,7 +97,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui
 
     @Override
     protected void existingOneNestedWriteAdditionalNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertContains(change.getCreatedData(), path(FOO,BAZ));
         assertNotContains(change.getCreatedData(), path(FOO,BAR));
@@ -108,7 +108,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui
 
     @Override
     protected void existingTopWriteTwoNested(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertContains(change.getCreatedData(), path(FOO,BAR),path(FOO,BAZ));
         assertContains(change.getUpdatedData(), path(FOO));
@@ -118,7 +118,7 @@ public class WildcardedScopeSubtreeTest extends DefaultDataChangeListenerTestSui
 
     @Override
     protected void existingTopWriteSibling(final DatastoreTestTask task) throws InterruptedException, ExecutionException {
-        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent().get();
+        AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change = task.getChangeEvent();
 
         assertContains(change.getCreatedData(), path(FOO_SIBLING));
         assertNotContains(change.getUpdatedData(), path(FOO), TOP_LEVEL);