BUG-1120: fix race window in unregistration 67/7567/3
authorRobert Varga <rovarga@cisco.com>
Sun, 1 Jun 2014 14:33:52 +0000 (16:33 +0200)
committerRobert Varga <rovarga@cisco.com>
Mon, 2 Jun 2014 09:32:02 +0000 (11:32 +0200)
This has the nice benefit of unifying the registration/unregistration
paths, too. This requires a supporting ehnancement in yangtools, which
has been submitted at https://git.opendaylight.org/gerrit/#/c/7568/

Change-Id: Ifd489d6eaf5b7feb4a6efaa09bd468d5818c24cc
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AbstractNotificationListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AggregatedNotificationListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GeneratedListenerRegistration.java [deleted file]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GenericNotificationRegistration.java [deleted file]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotifyTask.java

diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AbstractNotificationListenerRegistration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AbstractNotificationListenerRegistration.java
new file mode 100644 (file)
index 0000000..5e7c913
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Abstract implementation of {@link NotificationListenerRegistration}.
+ *
+ * @param <T> Notification type
+ */
+abstract class AbstractNotificationListenerRegistration<T extends Notification> extends AbstractListenerRegistration<NotificationListener<T>> implements NotificationListenerRegistration<T> {
+    private final Class<? extends Notification> type;
+
+    protected AbstractNotificationListenerRegistration(final Class<? extends Notification> type, final NotificationListener<T> listener) {
+        super(listener);
+        this.type = Preconditions.checkNotNull(type);
+    }
+
+    @Override
+    public Class<? extends Notification> getType() {
+        return type;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public void notify(final Notification notification) {
+        if (!isClosed()) {
+            getInstance().onNotification((T)notification);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AggregatedNotificationListenerRegistration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/AggregatedNotificationListenerRegistration.java
new file mode 100644 (file)
index 0000000..f0db891
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An aggregated listener registration. This is a result of registering an invoker which can handle multiple
+ * interfaces at the same time. In order to support correct delivery, we need to maintain per-type registrations
+ * which get squashed if a notification which implements multiple interfaces is encountered.
+ *
+ * We take care of that by implementing alternate {@link #hashCode()}/{@link #equals(Object)}, which resolve
+ * to the backing aggregator.
+ *
+ * @param <N> Notification type
+ * @param <A> Aggregator type
+ */
+abstract class AggregatedNotificationListenerRegistration<N extends Notification, A> extends AbstractNotificationListenerRegistration<N> {
+    private final A aggregator;
+
+    protected AggregatedNotificationListenerRegistration(final Class<? extends Notification> type, final NotificationListener<N> listener, final A aggregator) {
+        super(type, listener);
+        this.aggregator = Preconditions.checkNotNull(aggregator);
+    }
+
+    protected A getAggregator() {
+        return aggregator;
+    }
+
+    @Override
+    public int hashCode() {
+        return aggregator.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (!this.getClass().equals(obj.getClass())) {
+            return false;
+        }
+
+        return aggregator.equals(((AggregatedNotificationListenerRegistration<?, ?>)obj).aggregator);
+    }
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GeneratedListenerRegistration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GeneratedListenerRegistration.java
deleted file mode 100644 (file)
index 5325ed3..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.binding.impl;
-
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.NotificationListener;
-
-import com.google.common.base.Preconditions;
-
-class GeneratedListenerRegistration extends AbstractObjectRegistration<NotificationListener> implements ListenerRegistration<NotificationListener> {
-    private NotificationBrokerImpl notificationBroker;
-    private final NotificationInvoker invoker;
-
-    public GeneratedListenerRegistration(final NotificationListener instance, final NotificationInvoker invoker, final NotificationBrokerImpl broker) {
-        super(instance);
-        this.invoker = Preconditions.checkNotNull(invoker);
-        this.notificationBroker = Preconditions.checkNotNull(broker);
-    }
-
-    public NotificationInvoker getInvoker() {
-        // There is a race with NotificationBrokerImpl:
-        // the invoker can be closed here
-        return invoker;
-    }
-
-    @Override
-    protected void removeRegistration() {
-        notificationBroker.unregisterListener(this);
-        notificationBroker = null;
-        invoker.close();
-    }
-}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GenericNotificationRegistration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/GenericNotificationRegistration.java
deleted file mode 100644 (file)
index 448adfa..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.binding.impl;
-
-import org.opendaylight.controller.sal.binding.api.NotificationListener;
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.binding.Notification;
-
-import com.google.common.base.Preconditions;
-
-class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
-    private final Class<T> type;
-    private NotificationBrokerImpl notificationBroker;
-
-    public GenericNotificationRegistration(final Class<T> type, final NotificationListener<T> instance, final NotificationBrokerImpl broker) {
-        super(instance);
-        this.type = Preconditions.checkNotNull(type);
-        this.notificationBroker = Preconditions.checkNotNull(broker);
-    }
-
-    public Class<T> getType() {
-        return type;
-    }
-
-    @Override
-    protected void removeRegistration() {
-        notificationBroker.unregisterListener(this);
-        notificationBroker = null;
-    }
-}
index d3b68002c3a64402a23d7fb20ae557165e4f9e72..7d844b3bf578c1958b3849e4b4d6841b2fd24ddb 100644 (file)
@@ -8,7 +8,7 @@
 package org.opendaylight.controller.sal.binding.impl;
 
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -17,8 +17,8 @@ import org.opendaylight.controller.sal.binding.api.NotificationListener;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.slf4j.Logger;
@@ -37,8 +37,8 @@ public class NotificationBrokerImpl implements NotificationProviderService, Auto
     private final ListenerRegistry<NotificationInterestListener> interestListeners =
             ListenerRegistry.create();
 
-    private final Multimap<Class<? extends Notification>, NotificationListener<?>> listeners =
-            Multimaps.synchronizedSetMultimap(HashMultimap.<Class<? extends Notification>, NotificationListener<?>>create());
+    private final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> listeners =
+            Multimaps.synchronizedSetMultimap(HashMultimap.<Class<? extends Notification>, NotificationListenerRegistration<?>>create());
     private ExecutorService executor;
 
     @Deprecated
@@ -70,26 +70,43 @@ public class NotificationBrokerImpl implements NotificationProviderService, Auto
 
     @Override
     public void publish(final Notification notification, final ExecutorService service) {
-        Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
+        final Set<NotificationListenerRegistration<?>> toNotify = new HashSet<>();
+
         for (final Class<?> type : getNotificationTypes(notification)) {
-            listenerToNotify = Iterables.concat(listenerToNotify, listeners.get(((Class<? extends Notification>) type)));
+            final Collection<NotificationListenerRegistration<?>> l = listeners.get((Class<? extends Notification>) type);
+            if (l != null) {
+                toNotify.addAll(l);
+            }
         }
 
-        final Set<NotifyTask> tasks = new HashSet<>();
-        for (NotificationListener<?> l : listenerToNotify) {
-            tasks.add(new NotifyTask(l, notification));
+        for (NotificationListenerRegistration<?> r : toNotify) {
+            service.submit(new NotifyTask(r, notification));
         }
+    }
 
-        for (final NotifyTask task : tasks) {
-            service.submit(task);
+    private void addRegistrations(final NotificationListenerRegistration<?>... registrations) {
+        for (NotificationListenerRegistration<?> reg : registrations) {
+            listeners.put(reg.getType(), reg);
+            this.announceNotificationSubscription(reg.getType());
+        }
+    }
+
+    void removeRegistrations(final NotificationListenerRegistration<?>... registrations) {
+        for (NotificationListenerRegistration<?> reg : registrations) {
+            listeners.remove(reg.getType(), reg);
         }
     }
 
     @Override
-    public <T extends Notification> Registration<NotificationListener<T>> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
-        final GenericNotificationRegistration<T> reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
-        this.listeners.put(notificationType, listener);
-        this.announceNotificationSubscription(notificationType);
+    public <T extends Notification> NotificationListenerRegistration<T> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
+        final NotificationListenerRegistration<T> reg = new AbstractNotificationListenerRegistration<T>(notificationType, listener) {
+            @Override
+            protected void removeRegistration() {
+                removeRegistrations(this);
+            }
+        };
+
+        addRegistrations(reg);
         return reg;
     }
 
@@ -105,25 +122,36 @@ public class NotificationBrokerImpl implements NotificationProviderService, Auto
     }
 
     @Override
-    public Registration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
+    public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
         final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
-        for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
-            listeners.put(notifyType, invoker.getInvocationProxy());
-            announceNotificationSubscription(notifyType);
+        final Set<Class<? extends Notification>> types = invoker.getSupportedNotifications();
+        final NotificationListenerRegistration<?>[] regs = new NotificationListenerRegistration<?>[types.size()];
+
+        // Populate the registrations...
+        int i = 0;
+        for (Class<? extends Notification> type : types) {
+            regs[i] = new AggregatedNotificationListenerRegistration<Notification, Object>(type, invoker.getInvocationProxy(), regs) {
+                @Override
+                protected void removeRegistration() {
+                    // Nothing to do, will be cleaned up by parent (below)
+                }
+            };
+            ++i;
         }
 
-        return new GeneratedListenerRegistration(listener, invoker, this);
-    }
-
-    protected boolean unregisterListener(final GenericNotificationRegistration<?> reg) {
-        return listeners.remove(reg.getType(), reg.getInstance());
-    }
+        // ... now put them to use ...
+        addRegistrations(regs);
 
-    protected void unregisterListener(final GeneratedListenerRegistration reg) {
-        final NotificationInvoker invoker = reg.getInvoker();
-        for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
-            this.listeners.remove(notifyType, invoker.getInvocationProxy());
-        }
+        // ... finally return the parent registration
+        return new AbstractListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener>(listener) {
+            @Override
+            protected void removeRegistration() {
+                removeRegistrations(regs);
+                for (ListenerRegistration<?> reg : regs) {
+                    reg.close();
+                }
+            }
+        };
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationListenerRegistration.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationListenerRegistration.java
new file mode 100644 (file)
index 0000000..3dba868
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.binding.impl;
+
+import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.Notification;
+
+/**
+ * A registration of a {@link NotificationListener}. Allows query of the type
+ * of the notification and dispatching the notification atomically with regard
+ * to unregistration.
+ *
+ * @param <T> Type of notification
+ */
+interface NotificationListenerRegistration<T extends Notification> extends ListenerRegistration<NotificationListener<T>> {
+    /**
+     * Return the interface class of the notification type.
+     *
+     * @return Notification type.
+     */
+    Class<? extends Notification> getType();
+
+    /**
+     * Dispatch a notification to the listener.
+     *
+     * @param notification Notification to be dispatched
+     */
+    void notify(Notification notification);
+}
index 5f0de6bc16c8a18ba1fc925b3a84693c85d06536..2622a71e55200c0a3ca2fafac2ad3a3c0f324ad0 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.sal.binding.impl;
 
-import org.opendaylight.controller.sal.binding.api.NotificationListener;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,37 +17,37 @@ import com.google.common.base.Preconditions;
 class NotifyTask implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(NotifyTask.class);
 
-    private final NotificationListener<?> listener;
+    private final NotificationListenerRegistration<?> registration;
     private final Notification notification;
 
-    public NotifyTask(final NotificationListener<?> listener, final Notification notification) {
-        this.listener = Preconditions.checkNotNull(listener);
+    public NotifyTask(final NotificationListenerRegistration<?> registration, final Notification notification) {
+        this.registration = Preconditions.checkNotNull(registration);
         this.notification = Preconditions.checkNotNull(notification);
     }
 
     @SuppressWarnings("unchecked")
-    private <T extends Notification> NotificationListener<T> getListener() {
-        return (NotificationListener<T>)listener;
+    private <T extends Notification> NotificationListenerRegistration<T> getRegistration() {
+        return (NotificationListenerRegistration<T>)registration;
     }
 
     @Override
     public void run() {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Delivering notification {} to {}", notification, listener);
+            LOG.debug("Delivering notification {} to {}", notification, registration.getInstance());
         } else {
-            LOG.trace("Delivering notification {} to {}", notification.getClass().getName(), listener);
+            LOG.trace("Delivering notification {} to {}", notification.getClass().getName(), registration.getInstance());
         }
 
         try {
-            getListener().onNotification(notification);
+            getRegistration().notify(notification);
         } catch (final Exception e) {
-            LOG.error("Unhandled exception thrown by listener: {}", listener, e);
+            LOG.error("Unhandled exception thrown by listener: {}", registration.getInstance(), e);
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Notification delivered {} to {}", notification, listener);
+            LOG.debug("Notification delivered {} to {}", notification, registration.getInstance());
         } else {
-            LOG.trace("Notification delivered {} to {}", notification.getClass().getName(), listener);
+            LOG.trace("Notification delivered {} to {}", notification.getClass().getName(), registration.getInstance());
         }
     }
 
@@ -56,7 +55,7 @@ class NotifyTask implements Runnable {
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((listener== null) ? 0 : listener.hashCode());
+        result = prime * result + ((registration== null) ? 0 : registration.hashCode());
         result = prime * result + ((notification== null) ? 0 : notification.hashCode());
         return result;
     }
@@ -70,10 +69,10 @@ class NotifyTask implements Runnable {
         if (getClass() != obj.getClass())
             return false;
         NotifyTask other = (NotifyTask) obj;
-        if (listener == null) {
-            if (other.listener != null)
+        if (registration == null) {
+            if (other.registration != null)
                 return false;
-        } else if (!listener.equals(other.listener))
+        } else if (!registration.equals(other.registration))
             return false;
         if (notification == null) {
             if (other.notification != null)
@@ -86,7 +85,7 @@ class NotifyTask implements Runnable {
     @Override
     public String toString() {
         return Objects.toStringHelper(this)
-                .add("listener", listener)
+                .add("listener", registration)
                 .add("notification", notification.getClass())
                 .toString();
     }