Fix unsubscribe checks in DOMNotificationRouterEvent 87/89487/1
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 27 Apr 2020 11:42:12 +0000 (13:42 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 28 Apr 2020 10:16:30 +0000 (12:16 +0200)
The checks here are based on old assumption that the listener
would be set to null, which does not hold true for quite some
time. Update the check to close a simple race.

JIRA: MDSAL-545
Change-Id: I950ffcbcf732d4c24dcad675b81c2889465d9045
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 91749a5a5fb089e74306f288d786acb8d3c450ae)

dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouter.java
dom/mdsal-dom-broker/src/main/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouterEvent.java
dom/mdsal-dom-broker/src/test/java/org/opendaylight/mdsal/dom/broker/DOMNotificationRouterTest.java

index c609f3389cd152542fd3b9614225069926547295..67562a8a3383007cb6702c3558270b6ba4f318f4 100644 (file)
@@ -82,7 +82,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     private final ScheduledThreadPoolExecutor observer;
     private final ExecutorService executor;
 
-    private volatile Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> listeners =
+    private volatile Multimap<SchemaPath, AbstractListenerRegistration<? extends DOMNotificationListener>> listeners =
             ImmutableMultimap.of();
 
     @VisibleForTesting
@@ -113,7 +113,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     @Override
     public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
             final T listener, final Collection<SchemaPath> types) {
-        final ListenerRegistration<T> reg = new AbstractListenerRegistration<T>(listener) {
+        final AbstractListenerRegistration<T> reg = new AbstractListenerRegistration<T>(listener) {
             @Override
             protected void removeRegistration() {
                 synchronized (DOMNotificationRouter.this) {
@@ -124,7 +124,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
         };
 
         if (!types.isEmpty()) {
-            final Builder<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> b =
+            final Builder<SchemaPath, AbstractListenerRegistration<? extends DOMNotificationListener>> b =
                     ImmutableMultimap.builder();
             b.putAll(listeners);
 
@@ -150,7 +150,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
      * @param newListeners is used to notify listenerTypes changed
      */
     private void replaceListeners(
-            final Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> newListeners) {
+            final Multimap<SchemaPath, AbstractListenerRegistration<? extends DOMNotificationListener>> newListeners) {
         listeners = newListeners;
         notifyListenerTypesChanged(newListeners.keySet());
     }
@@ -180,7 +180,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     }
 
     private ListenableFuture<Void> publish(final long seq, final DOMNotification notification,
-            final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+            final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
         final DOMNotificationRouterEvent event = disruptor.get(seq);
         final ListenableFuture<Void> future = event.initialize(notification, subscribers);
         disruptor.getRingBuffer().publish(seq);
@@ -190,7 +190,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     @Override
     public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
             throws InterruptedException {
-        final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+        final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
                 listeners.get(notification.getType());
         if (subscribers.isEmpty()) {
             return NO_LISTENERS;
@@ -203,7 +203,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     @SuppressWarnings("checkstyle:IllegalCatch")
     @VisibleForTesting
     ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
-            final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+            final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
         final long seq;
         try {
             seq = disruptor.getRingBuffer().tryNext();
@@ -216,7 +216,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
 
     @Override
     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
-        final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+        final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
                 listeners.get(notification.getType());
         if (subscribers.isEmpty()) {
             return NO_LISTENERS;
@@ -228,7 +228,7 @@ public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPubl
     @Override
     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
             final TimeUnit unit) throws InterruptedException {
-        final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
+        final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
                 listeners.get(notification.getType());
         if (subscribers.isEmpty()) {
             return NO_LISTENERS;
index 6eafd0696fc126235f48c9960f133db865992452..8743fc90d72f038853794a20eb414b7c1a370f81 100644 (file)
@@ -15,7 +15,7 @@ import com.lmax.disruptor.EventFactory;
 import java.util.Collection;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 
 /**
  * A single notification event in the disruptor ringbuffer. These objects are reused,
@@ -24,7 +24,7 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 final class DOMNotificationRouterEvent {
     static final EventFactory<DOMNotificationRouterEvent> FACTORY = DOMNotificationRouterEvent::new;
 
-    private Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers;
+    private Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers;
     private DOMNotification notification;
     private SettableFuture<Void> future;
 
@@ -34,7 +34,7 @@ final class DOMNotificationRouterEvent {
 
     @SuppressWarnings("checkstyle:hiddenField")
     ListenableFuture<Void> initialize(final DOMNotification notification,
-            final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+            final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
         this.notification = requireNonNull(notification);
         this.subscribers = requireNonNull(subscribers);
         this.future = SettableFuture.create();
@@ -42,10 +42,9 @@ final class DOMNotificationRouterEvent {
     }
 
     void deliverNotification() {
-        for (ListenerRegistration<? extends DOMNotificationListener> r : subscribers) {
-            final DOMNotificationListener l = r.getInstance();
-            if (l != null) {
-                l.onNotification(notification);
+        for (AbstractListenerRegistration<? extends DOMNotificationListener> r : subscribers) {
+            if (r.notClosed()) {
+                r.getInstance().onNotification(notification);
             }
         }
     }
index 4a21a4416edb966f6abd06ac29e68b48a41a9f2a..3717b8a52143a8f0b5f495c013f2348510193222 100644 (file)
@@ -30,7 +30,7 @@ import org.opendaylight.mdsal.dom.api.DOMNotification;
 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
@@ -164,7 +164,7 @@ public class DOMNotificationRouterTest extends TestUtils {
 
         @Override
         protected ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
-                final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+                final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
             return DOMNotificationPublishService.REJECTED;
         }