Add MXBean to report shard registered DTCL/DCL info
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DefaultShardDataChangeListenerPublisher.java
index cf98f38d0789d3a8d9cede7beb6d407e2659186f..89da5b2621b7322eda587c889c8c0a376742421c 100644 (file)
@@ -7,8 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.base.Stopwatch;
-import java.util.concurrent.TimeUnit;
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -16,6 +16,7 @@ import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeE
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -35,19 +36,25 @@ final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeLi
     private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataChangeListenerPublisher.class);
 
     private final ListenerTree dataChangeListenerTree = ListenerTree.create();
-    private final Stopwatch timer = Stopwatch.createUnstarted();
+    private final String logContext;
+
+    DefaultShardDataChangeListenerPublisher(String logContext) {
+        this.logContext = logContext;
+    }
 
     @Override
-    public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
-        LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification);
+    public void submitNotification(final DataChangeListenerRegistration<?> listener,
+            final DOMImmutableDataChangeEvent notification) {
+        LOG.debug("{}: Notifying listener {} about {}", logContext, listener.getInstance(), notification);
 
         listener.getInstance().onDataChanged(notification);
     }
 
     @Override
-    public void submitNotifications(final DataChangeListenerRegistration<?> listener, final Iterable<DOMImmutableDataChangeEvent> notifications) {
+    public void submitNotifications(final DataChangeListenerRegistration<?> listener,
+            final Iterable<DOMImmutableDataChangeEvent> notifications) {
         final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
-        LOG.debug("Notifying listener {} about {}", instance, notifications);
+        LOG.debug("{}: Notifying listener {} about {}", logContext, instance, notifications);
 
         for (DOMImmutableDataChangeEvent n : notifications) {
             instance.onDataChanged(n);
@@ -55,33 +62,31 @@ final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeLi
     }
 
     @Override
-    public void publishChanges(DataTreeCandidate candidate, String logContext) {
-        timer.start();
-
-        try {
-            ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this);
-        } finally {
-            timer.stop();
-            long elapsedTime = timer.elapsed(TimeUnit.MILLISECONDS);
-            if(elapsedTime >= PUBLISH_DELAY_THRESHOLD_IN_MS) {
-                LOG.warn("{}: Generation of DataChange events took longer than expected. Elapsed time: {}",
-                        logContext, timer);
-            } else {
-                LOG.debug("{}: Elapsed time for generation of DataChange events: {}", logContext, timer);
-            }
-
-            timer.reset();
-        }
+    public void publishChanges(DataTreeCandidate candidate) {
+        ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this);
     }
 
     @Override
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L>
-            registerDataChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope) {
-        return dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+    public void registerDataChangeListener(YangInstanceIdentifier path,
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                    onRegistration) {
+        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                registration = dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+
+        onRegistration.accept(registration);
+
+        if (initialState.isPresent()) {
+            notifySingleListener(path, listener, scope, initialState.get(), logContext);
+        }
     }
 
-    @Override
-    public ShardDataChangeListenerPublisher newInstance() {
-        return new DefaultShardDataChangeListenerPublisher();
+    static void notifySingleListener(final YangInstanceIdentifier path,
+            final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
+            final DataChangeScope scope, final DataTreeCandidate initialState, String logContext) {
+        DefaultShardDataChangeListenerPublisher publisher = new DefaultShardDataChangeListenerPublisher(logContext);
+        publisher.registerDataChangeListener(path, listener, scope, Optional.absent(), noop -> { });
+        publisher.publishChanges(initialState);
     }
 }