BUG-1120: fold GenerationalListenerMap back
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / NotificationBrokerImpl.java
index d3b68002c3a64402a23d7fb20ae557165e4f9e72..258ba517775a2145f3c3046ebbf08194123a0439 100644 (file)
@@ -7,90 +7,82 @@
  */
 package org.opendaylight.controller.sal.binding.impl;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.concurrent.GuardedBy;
 
 import org.opendaylight.controller.sal.binding.api.NotificationListener;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 
 public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
 
     private final ListenerRegistry<NotificationInterestListener> interestListeners =
             ListenerRegistry.create();
+    private final AtomicReference<ListenerMapGeneration> listeners = new AtomicReference<>(new ListenerMapGeneration());
+    private final ExecutorService executor;
 
-    private final Multimap<Class<? extends Notification>, NotificationListener<?>> listeners =
-            Multimaps.synchronizedSetMultimap(HashMultimap.<Class<? extends Notification>, NotificationListener<?>>create());
-    private ExecutorService executor;
-
-    @Deprecated
     public NotificationBrokerImpl(final ExecutorService executor) {
-        this.setExecutor(executor);
-    }
-
-    public void setExecutor(final ExecutorService executor) {
         this.executor = Preconditions.checkNotNull(executor);
     }
 
-    public Iterable<Class<?>> getNotificationTypes(final Notification notification) {
-        final Class<?>[] ifaces = notification.getClass().getInterfaces();
-        return Iterables.filter(Arrays.asList(ifaces), new Predicate<Class<?>>() {
-            @Override
-            public boolean apply(final Class<?> input) {
-                if (Notification.class.equals(input)) {
-                    return false;
-                }
-                return Notification.class.isAssignableFrom(input);
-            }
-        });
-    }
-
     @Override
     public void publish(final Notification notification) {
-        this.publish(notification, executor);
+        publish(notification, executor);
     }
 
     @Override
     public void publish(final Notification notification, final ExecutorService service) {
-        Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
-        for (final Class<?> type : getNotificationTypes(notification)) {
-            listenerToNotify = Iterables.concat(listenerToNotify, listeners.get(((Class<? extends Notification>) type)));
+        for (NotificationListenerRegistration<?> r : listeners.get().listenersFor(notification)) {
+            service.submit(new NotifyTask(r, notification));
         }
+    }
+
+    @GuardedBy("this")
+    private Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> mutableListeners() {
+        return HashMultimap.create(listeners.get().getListeners());
+    }
 
-        final Set<NotifyTask> tasks = new HashSet<>();
-        for (NotificationListener<?> l : listenerToNotify) {
-            tasks.add(new NotifyTask(l, notification));
+    private final void addRegistrations(final NotificationListenerRegistration<?>... registrations) {
+        synchronized (this) {
+            final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
+                    mutableListeners();
+            for (NotificationListenerRegistration<?> reg : registrations) {
+                newListeners.put(reg.getType(), reg);
+            }
+
+            listeners.set(new ListenerMapGeneration(newListeners));
         }
 
-        for (final NotifyTask task : tasks) {
-            service.submit(task);
+        // Notifications are dispatched out of lock...
+        for (NotificationListenerRegistration<?> reg : registrations) {
+            announceNotificationSubscription(reg.getType());
         }
     }
 
-    @Override
-    public <T extends Notification> Registration<NotificationListener<T>> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
-        final GenericNotificationRegistration<T> reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
-        this.listeners.put(notificationType, listener);
-        this.announceNotificationSubscription(notificationType);
-        return reg;
+    private synchronized void removeRegistrations(final NotificationListenerRegistration<?>... registrations) {
+        final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
+                mutableListeners();
+
+        for (NotificationListenerRegistration<?> reg : registrations) {
+            newListeners.remove(reg.getType(), reg);
+        }
+
+        listeners.set(new ListenerMapGeneration(newListeners));
     }
 
     private void announceNotificationSubscription(final Class<? extends Notification> notification) {
@@ -105,37 +97,63 @@ public class NotificationBrokerImpl implements NotificationProviderService, Auto
     }
 
     @Override
-    public Registration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
-        final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
-        for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
-            listeners.put(notifyType, invoker.getInvocationProxy());
-            announceNotificationSubscription(notifyType);
-        }
+    public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
+        final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
 
-        return new GeneratedListenerRegistration(listener, invoker, this);
+        for (final Class<? extends Notification> notification : listeners.get().getKnownTypes()) {
+            interestListener.onNotificationSubscribtion(notification);
+        }
+        return registration;
     }
 
-    protected boolean unregisterListener(final GenericNotificationRegistration<?> reg) {
-        return listeners.remove(reg.getType(), reg.getInstance());
+    @Override
+    public <T extends Notification> NotificationListenerRegistration<T> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
+        final NotificationListenerRegistration<T> reg = new AbstractNotificationListenerRegistration<T>(notificationType, listener) {
+            @Override
+            protected void removeRegistration() {
+                removeRegistrations(this);
+            }
+        };
+
+        addRegistrations(reg);
+        return reg;
     }
 
-    protected void unregisterListener(final GeneratedListenerRegistration reg) {
-        final NotificationInvoker invoker = reg.getInvoker();
-        for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
-            this.listeners.remove(notifyType, invoker.getInvocationProxy());
+    @Override
+    public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
+        final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
+        final Set<Class<? extends Notification>> types = invoker.getSupportedNotifications();
+        final NotificationListenerRegistration<?>[] regs = new NotificationListenerRegistration<?>[types.size()];
+
+        // Populate the registrations...
+        int i = 0;
+        for (Class<? extends Notification> type : types) {
+            regs[i] = new AggregatedNotificationListenerRegistration<Notification, Object>(type, invoker.getInvocationProxy(), regs) {
+                @Override
+                protected void removeRegistration() {
+                    // Nothing to do, will be cleaned up by parent (below)
+                }
+            };
+            ++i;
         }
+
+        // ... now put them to use ...
+        addRegistrations(regs);
+
+        // ... finally return the parent registration
+        return new AbstractListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener>(listener) {
+            @Override
+            protected void removeRegistration() {
+                removeRegistrations(regs);
+                for (ListenerRegistration<?> reg : regs) {
+                    reg.close();
+                }
+            }
+        };
     }
 
     @Override
     public void close() {
     }
 
-    @Override
-    public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
-        final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
-        for (final Class<? extends Notification> notification : listeners.keySet()) {
-            interestListener.onNotificationSubscribtion(notification);
-        }
-        return registration;
-    }
 }