Bug 1430: Off-load notifications from single commit thread
[controller.git] / opendaylight / md-sal / sal-inmemory-datastore / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
index c44d0909d688773d0cc37034cae21541f0823e6f..b61b3671034601fc09d7658280f986d4d30cc3ce 100644 (file)
@@ -13,11 +13,17 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
+
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.util.ExecutorServiceUtil;
+import org.opendaylight.yangtools.util.PropertyUtils;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -43,8 +49,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
+
 import java.util.Collections;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -61,16 +70,51 @@ import static com.google.common.base.Preconditions.checkState;
 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
         TransactionReadyPrototype,AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
+
+    @SuppressWarnings("rawtypes")
+    private static final QueuedNotificationManager.Invoker<AsyncDataChangeListener,
+                                       AsyncDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
+            new QueuedNotificationManager.Invoker<AsyncDataChangeListener,
+                                                  AsyncDataChangeEvent>() {
+
+                @SuppressWarnings("unchecked")
+                @Override
+                public void invokeListener( AsyncDataChangeListener listener,
+                                            AsyncDataChangeEvent notification ) {
+                    listener.onDataChanged(notification);
+                }
+            };
+
+    private static final String DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP =
+            "mdsal.datastore-dcl-notification-queue.size";
+
+    private static final int DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE = 1000;
+
     private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
     private final ListenerTree listenerTree = ListenerTree.create();
     private final AtomicLong txCounter = new AtomicLong(0);
-    private final ListeningExecutorService executor;
+    private final ListeningExecutorService listeningExecutor;
+
+    @SuppressWarnings("rawtypes")
+    private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
+                                                              dataChangeListenerNotificationManager;
+    private final ExecutorService dataChangeListenerExecutor;
 
     private final String name;
 
-    public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
+    public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
+            final ExecutorService dataChangeListenerExecutor) {
         this.name = Preconditions.checkNotNull(name);
-        this.executor = Preconditions.checkNotNull(executor);
+        this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor);
+
+        this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
+
+        int maxDCLQueueSize = PropertyUtils.getIntSystemProperty(
+                DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE );
+
+        dataChangeListenerNotificationManager =
+                new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
+                        DCL_NOTIFICATION_MGR_INVOKER, maxDCLQueueSize, "DataChangeListenerQueueMgr");
     }
 
     @Override
@@ -104,8 +148,9 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
     }
 
     @Override
-    public void close(){
-        executor.shutdownNow();
+    public void close() {
+        ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
+        ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
     }
     @Override
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
@@ -132,7 +177,9 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
                         .setAfter(data) //
                         .addCreated(path, data) //
                         .build();
-                executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
+
+                new ChangeListenerNotifyTask(Collections.singletonList(reg), event,
+                        dataChangeListenerNotificationManager).run();
             }
         }
 
@@ -221,8 +268,9 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
         @Override
         public void close() {
 
-             executor.shutdownNow();
-
+            // FIXME: this call doesn't look right here - listeningExecutor is shared and owned
+            // by the outer class.
+            //listeningExecutor.shutdownNow();
         }
 
         protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
@@ -308,7 +356,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Boolean> canCommit() {
-            return executor.submit(new Callable<Boolean>() {
+            return listeningExecutor.submit(new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws TransactionCommitFailedException {
                     try {
@@ -330,11 +378,12 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
         @Override
         public ListenableFuture<Void> preCommit() {
-            return executor.submit(new Callable<Void>() {
+            return listeningExecutor.submit(new Callable<Void>() {
                 @Override
                 public Void call() {
                     candidate = dataTree.prepare(modification);
-                    listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
+                    listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree,
+                            dataChangeListenerNotificationManager);
                     return null;
                 }
             });
@@ -359,7 +408,7 @@ public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, Sch
 
                 for (ChangeListenerNotifyTask task : listenerResolver.call()) {
                     LOG.trace("Scheduling invocation of listeners: {}", task);
-                    executor.submit(task);
+                    task.run();
                 }
             }