X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FNotificationBrokerImpl.java;h=258ba517775a2145f3c3046ebbf08194123a0439;hb=085b6e0c317cb5b9fc26abb308829d508973d051;hp=7d844b3bf578c1958b3849e4b4d6841b2fd24ddb;hpb=537799b21364c2567314b54a4fad8c1307ca79f5;p=controller.git diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java index 7d844b3bf5..258ba51777 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java @@ -7,11 +7,11 @@ */ package org.opendaylight.controller.sal.binding.impl; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.sal.binding.api.NotificationListener; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; @@ -25,78 +25,87 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.collect.HashMultimap; -import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class); private final ListenerRegistry interestListeners = ListenerRegistry.create(); + private final AtomicReference listeners = new AtomicReference<>(new ListenerMapGeneration()); + private final ExecutorService executor; - private final Multimap, NotificationListenerRegistration> listeners = - Multimaps.synchronizedSetMultimap(HashMultimap., NotificationListenerRegistration>create()); - private ExecutorService executor; - - @Deprecated public NotificationBrokerImpl(final ExecutorService executor) { - this.setExecutor(executor); - } - - public void setExecutor(final ExecutorService executor) { this.executor = Preconditions.checkNotNull(executor); } - public Iterable> getNotificationTypes(final Notification notification) { - final Class[] ifaces = notification.getClass().getInterfaces(); - return Iterables.filter(Arrays.asList(ifaces), new Predicate>() { - @Override - public boolean apply(final Class input) { - if (Notification.class.equals(input)) { - return false; - } - return Notification.class.isAssignableFrom(input); - } - }); - } - @Override public void publish(final Notification notification) { - this.publish(notification, executor); + publish(notification, executor); } @Override public void publish(final Notification notification, final ExecutorService service) { - final Set> toNotify = new HashSet<>(); + for (NotificationListenerRegistration r : listeners.get().listenersFor(notification)) { + service.submit(new NotifyTask(r, notification)); + } + } + + @GuardedBy("this") + private Multimap, NotificationListenerRegistration> mutableListeners() { + return HashMultimap.create(listeners.get().getListeners()); + } - for (final Class type : getNotificationTypes(notification)) { - final Collection> l = listeners.get((Class) type); - if (l != null) { - toNotify.addAll(l); + private final void addRegistrations(final NotificationListenerRegistration... registrations) { + synchronized (this) { + final Multimap, NotificationListenerRegistration> newListeners = + mutableListeners(); + for (NotificationListenerRegistration reg : registrations) { + newListeners.put(reg.getType(), reg); } + + listeners.set(new ListenerMapGeneration(newListeners)); } - for (NotificationListenerRegistration r : toNotify) { - service.submit(new NotifyTask(r, notification)); + // Notifications are dispatched out of lock... + for (NotificationListenerRegistration reg : registrations) { + announceNotificationSubscription(reg.getType()); } } - private void addRegistrations(final NotificationListenerRegistration... registrations) { + private synchronized void removeRegistrations(final NotificationListenerRegistration... registrations) { + final Multimap, NotificationListenerRegistration> newListeners = + mutableListeners(); + for (NotificationListenerRegistration reg : registrations) { - listeners.put(reg.getType(), reg); - this.announceNotificationSubscription(reg.getType()); + newListeners.remove(reg.getType(), reg); } + + listeners.set(new ListenerMapGeneration(newListeners)); } - void removeRegistrations(final NotificationListenerRegistration... registrations) { - for (NotificationListenerRegistration reg : registrations) { - listeners.remove(reg.getType(), reg); + private void announceNotificationSubscription(final Class notification) { + for (final ListenerRegistration listener : interestListeners) { + try { + listener.getInstance().onNotificationSubscribtion(notification); + } catch (Exception e) { + LOG.warn("Listener {} reported unexpected error on notification {}", + listener.getInstance(), notification, e); + } } } + @Override + public ListenerRegistration registerInterestListener(final NotificationInterestListener interestListener) { + final ListenerRegistration registration = this.interestListeners.register(interestListener); + + for (final Class notification : listeners.get().getKnownTypes()) { + interestListener.onNotificationSubscribtion(notification); + } + return registration; + } + @Override public NotificationListenerRegistration registerNotificationListener(final Class notificationType, final NotificationListener listener) { final NotificationListenerRegistration reg = new AbstractNotificationListenerRegistration(notificationType, listener) { @@ -110,17 +119,6 @@ public class NotificationBrokerImpl implements NotificationProviderService, Auto return reg; } - private void announceNotificationSubscription(final Class notification) { - for (final ListenerRegistration listener : interestListeners) { - try { - listener.getInstance().onNotificationSubscribtion(notification); - } catch (Exception e) { - LOG.warn("Listener {} reported unexpected error on notification {}", - listener.getInstance(), notification, e); - } - } - } - @Override public ListenerRegistration registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) { final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener); @@ -158,12 +156,4 @@ public class NotificationBrokerImpl implements NotificationProviderService, Auto public void close() { } - @Override - public ListenerRegistration registerInterestListener(final NotificationInterestListener interestListener) { - final ListenerRegistration registration = this.interestListeners.register(interestListener); - for (final Class notification : listeners.keySet()) { - interestListener.onNotificationSubscribtion(notification); - } - return registration; - } }