BUG-8639: always invalidate primary info cache
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DefaultShardDataChangeListenerPublisher.java
index bf52aa14c99431a378fc663ba9bb2c66c79a2723..89da5b2621b7322eda587c889c8c0a376742421c 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Optional;
+import java.util.function.Consumer;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -14,6 +16,7 @@ import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeE
 import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
 import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -33,18 +36,25 @@ final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeLi
     private static final Logger LOG = LoggerFactory.getLogger(DefaultShardDataChangeListenerPublisher.class);
 
     private final ListenerTree dataChangeListenerTree = ListenerTree.create();
+    private final String logContext;
+
+    DefaultShardDataChangeListenerPublisher(String logContext) {
+        this.logContext = logContext;
+    }
 
     @Override
-    public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
-        LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification);
+    public void submitNotification(final DataChangeListenerRegistration<?> listener,
+            final DOMImmutableDataChangeEvent notification) {
+        LOG.debug("{}: Notifying listener {} about {}", logContext, listener.getInstance(), notification);
 
         listener.getInstance().onDataChanged(notification);
     }
 
     @Override
-    public void submitNotifications(final DataChangeListenerRegistration<?> listener, final Iterable<DOMImmutableDataChangeEvent> notifications) {
+    public void submitNotifications(final DataChangeListenerRegistration<?> listener,
+            final Iterable<DOMImmutableDataChangeEvent> notifications) {
         final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
-        LOG.debug("Notifying listener {} about {}", instance, notifications);
+        LOG.debug("{}: Notifying listener {} about {}", logContext, instance, notifications);
 
         for (DOMImmutableDataChangeEvent n : notifications) {
             instance.onDataChanged(n);
@@ -52,18 +62,31 @@ final class DefaultShardDataChangeListenerPublisher implements ShardDataChangeLi
     }
 
     @Override
-    public void publishChanges(DataTreeCandidate candidate, String logContext) {
+    public void publishChanges(DataTreeCandidate candidate) {
         ResolveDataChangeEventsTask.create(candidate, dataChangeListenerTree).resolve(this);
     }
 
     @Override
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> DataChangeListenerRegistration<L>
-            registerDataChangeListener(YangInstanceIdentifier path, L listener, DataChangeScope scope) {
-        return dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+    public void registerDataChangeListener(YangInstanceIdentifier path,
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, DataChangeScope scope,
+            Optional<DataTreeCandidate> initialState,
+            Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
+                    onRegistration) {
+        final DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                registration = dataChangeListenerTree.registerDataChangeListener(path, listener, scope);
+
+        onRegistration.accept(registration);
+
+        if (initialState.isPresent()) {
+            notifySingleListener(path, listener, scope, initialState.get(), logContext);
+        }
     }
 
-    @Override
-    public ShardDataChangeListenerPublisher newInstance() {
-        return new DefaultShardDataChangeListenerPublisher();
+    static void notifySingleListener(final YangInstanceIdentifier path,
+            final AsyncDataChangeListener<YangInstanceIdentifier,NormalizedNode<?, ?>> listener,
+            final DataChangeScope scope, final DataTreeCandidate initialState, String logContext) {
+        DefaultShardDataChangeListenerPublisher publisher = new DefaultShardDataChangeListenerPublisher(logContext);
+        publisher.registerDataChangeListener(path, listener, scope, Optional.absent(), noop -> { });
+        publisher.publishChanges(initialState);
     }
 }