Disconnect NotificationPublisherRegistration 80/105680/2
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 25 Apr 2023 19:43:41 +0000 (21:43 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 25 Apr 2023 22:02:19 +0000 (00:02 +0200)
Use a play yangtools.concepts.Registration as the baseline, which allows
us to use AbstractRegistration to manage the tie to the manager -- and
allows for a better thread-safety around concurrent close().

Change-Id: I33871bd59894d4ff95315dddeedcdd7b4e68f962
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
netconf/mdsal-netconf-notification/src/main/java/org/opendaylight/netconf/mdsal/notification/impl/NetconfNotificationManager.java
netconf/netconf-notifications-api/src/main/java/org/opendaylight/netconf/notifications/NotificationPublisherRegistration.java

index b7b0680e99501bd7d082c482f02fadb5cace3731..99d00f5b269a9f2135000a7cd52c616d1df5ac71 100644 (file)
@@ -16,7 +16,6 @@ import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multiset;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -48,6 +47,7 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.not
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.YangLibraryChange;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.YangLibraryUpdate;
 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.AbstractRegistration;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
@@ -96,7 +96,7 @@ public final class NetconfNotificationManager implements NetconfNotificationColl
     private final Multiset<StreamNameType> availableStreams = HashMultiset.create();
 
     @GuardedBy("this")
-    private final Set<GenericNotificationPublisherReg> notificationPublishers = new HashSet<>();
+    private final Set<PublisherReg> notificationPublishers = new HashSet<>();
     private final NotificationsTransformUtil transformUtil;
 
     @Inject
@@ -117,10 +117,8 @@ public final class NetconfNotificationManager implements NetconfNotificationColl
         notificationListeners.clear();
 
         // Unregister all publishers
-        // Use new list to avoid ConcurrentModificationException
-        for (final GenericNotificationPublisherReg notificationPublisher : new ArrayList<>(notificationPublishers)) {
-            notificationPublisher.close();
-        }
+        // Use new list to avoid ConcurrentModificationException when the registration removes itself
+        List.copyOf(notificationPublishers).forEach(PublisherReg::close);
         notificationPublishers.clear();
 
         // Clear stream Listeners
@@ -173,42 +171,30 @@ public final class NetconfNotificationManager implements NetconfNotificationColl
 
     @Override
     public synchronized NotificationPublisherRegistration registerNotificationPublisher(final Stream stream) {
-        final StreamNameType streamName = requireNonNull(stream).getName();
+        final var streamName = requireNonNull(stream).getName();
+        final var reg = new PublisherReg(this, streamName);
 
         LOG.debug("Notification publisher registered for stream: {}", streamName);
         if (LOG.isTraceEnabled()) {
             LOG.trace("Notification publisher registered for stream: {}", stream);
         }
 
-        if (streamMetadata.containsKey(streamName)) {
-            LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName,
-                    streamMetadata.get(streamName));
-        } else {
-            streamMetadata.put(streamName, stream);
+        final var prev = streamMetadata.putIfAbsent(streamName, stream);
+        if (prev != null) {
+            LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName, prev);
         }
 
         availableStreams.add(streamName);
 
-        final GenericNotificationPublisherReg reg = new GenericNotificationPublisherReg(this, streamName) {
-            @Override
-            public void close() {
-                synchronized (NetconfNotificationManager.this) {
-                    super.close();
-                }
-            }
-        };
-
         notificationPublishers.add(reg);
 
         notifyStreamAdded(stream);
         return reg;
     }
 
-    private void unregisterNotificationPublisher(
-            final StreamNameType streamName,
-            final GenericNotificationPublisherReg genericNotificationPublisherReg) {
+    private synchronized void unregisterNotificationPublisher(final StreamNameType streamName, final PublisherReg reg) {
         availableStreams.remove(streamName);
-        notificationPublishers.remove(genericNotificationPublisherReg);
+        notificationPublishers.remove(reg);
 
         LOG.debug("Notification publisher unregistered for stream: {}", streamName);
 
@@ -246,27 +232,28 @@ public final class NetconfNotificationManager implements NetconfNotificationColl
         return new YangLibraryPublisherReg(transformUtil, notificationPublisherRegistration);
     }
 
-    private static class GenericNotificationPublisherReg implements NotificationPublisherRegistration {
-        private NetconfNotificationManager baseListener;
+    private static final class PublisherReg extends AbstractRegistration implements NotificationPublisherRegistration {
         private final StreamNameType registeredStream;
 
-        GenericNotificationPublisherReg(final NetconfNotificationManager baseListener,
-                                        final StreamNameType registeredStream) {
-            this.baseListener = baseListener;
-            this.registeredStream = registeredStream;
+        private NetconfNotificationManager manager;
+
+        PublisherReg(final NetconfNotificationManager manager, final StreamNameType registeredStream) {
+            this.manager = requireNonNull(manager);
+            this.registeredStream = requireNonNull(registeredStream);
         }
 
         @Override
-        public void close() {
-            baseListener.unregisterNotificationPublisher(registeredStream, this);
-            baseListener = null;
+        public void onNotification(final StreamNameType stream, final NotificationMessage notification) {
+            checkArgument(stream.equals(registeredStream),
+                "Registered on %s, cannot publish to %s", registeredStream, stream);
+            checkState(notClosed(), "Already closed");
+            manager.onNotification(stream, notification);
         }
 
         @Override
-        public void onNotification(final StreamNameType stream, final NotificationMessage notification) {
-            checkState(baseListener != null, "Already closed");
-            checkArgument(stream.equals(registeredStream));
-            baseListener.onNotification(stream, notification);
+        protected void removeRegistration() {
+            manager.unregisterNotificationPublisher(registeredStream, this);
+            manager = null;
         }
     }
 
index f0a35b9794fdd51a6b39820743902537b2ac0060..04a92227c62c78c77668332d27f4f86910a93c3e 100644 (file)
@@ -7,9 +7,11 @@
  */
 package org.opendaylight.netconf.notifications;
 
+import org.opendaylight.yangtools.concepts.Registration;
+
 /**
  * Registration for notification publisher. This registration allows for publishing any netconf notifications
  */
-public interface NotificationPublisherRegistration extends NetconfNotificationListener, NotificationRegistration {
+public interface NotificationPublisherRegistration extends Registration, NetconfNotificationListener {
 
 }