X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FNotificationBrokerImpl.java;h=fd0b263f8e51cc1028571906cb54da98b727f5f7;hb=63ebb308c97fc06eafe813c347f9c8d7cde541eb;hp=5c7d924d340c07d2c6ecf76030d086c128835391;hpb=5bd4e3c091c2d309243b258eb014014db284f025;p=controller.git diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java index 5c7d924d34..fd0b263f8e 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java @@ -7,102 +7,51 @@ */ package org.opendaylight.controller.sal.binding.impl; -import java.util.Collections; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import org.eclipse.xtext.xbase.lib.Conversions; -import org.eclipse.xtext.xbase.lib.Functions.Function1; -import org.eclipse.xtext.xbase.lib.IterableExtensions; import org.opendaylight.controller.sal.binding.api.NotificationListener; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.concepts.util.ListenerRegistry; import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSet.Builder; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class); private final ListenerRegistry interestListeners = ListenerRegistry.create(); + private final GenerationalListenerMap listeners = new GenerationalListenerMap(); + private final ExecutorService executor; - private final Multimap, NotificationListener> listeners = - Multimaps.synchronizedSetMultimap(HashMultimap., NotificationListener>create()); - private ExecutorService executor; - - @Deprecated public NotificationBrokerImpl(final ExecutorService executor) { - this.setExecutor(executor); - } - - public void setExecutor(final ExecutorService executor) { this.executor = Preconditions.checkNotNull(executor); } - public Iterable> getNotificationTypes(final Notification notification) { - Class[] _interfaces = notification.getClass().getInterfaces(); - final Function1, Boolean> _function = new Function1, Boolean>() { - @Override - public Boolean apply(final Class it) { - if (Notification.class.equals(it)) { - return false; - } - return Notification.class.isAssignableFrom(it); - } - }; - return IterableExtensions.filter(((Iterable>)Conversions.doWrapArray(_interfaces)), _function); - } - @Override public void publish(final Notification notification) { - this.publish(notification, executor); + publish(notification, executor); } @Override public void publish(final Notification notification, final ExecutorService service) { - Iterable> listenerToNotify = Collections.emptySet(); - for (final Class type : getNotificationTypes(notification)) { - listenerToNotify = Iterables.concat(listenerToNotify, listeners.get(((Class) type))); + for (NotificationListenerRegistration r : listeners.listenersFor(notification)) { + service.submit(new NotifyTask(r, notification)); } - final Function1,NotifyTask> _function = new Function1, NotifyTask>() { - @Override - public NotifyTask apply(final NotificationListener it) { - return new NotifyTask(it, notification); - } - }; - final Set tasks = IterableExtensions.toSet( - IterableExtensions., NotifyTask>map(listenerToNotify, _function)); - this.submitAll(executor, tasks); } - private ImmutableSet> submitAll(final ExecutorService service, final Set tasks) { - final Builder> ret = ImmutableSet.>builder(); - for (final NotifyTask task : tasks) { - ret.add(service.submit(task)); + private final void addRegistrations(final NotificationListenerRegistration... registrations) { + listeners.addRegistrations(registrations); + for (NotificationListenerRegistration reg : registrations) { + announceNotificationSubscription(reg.getType()); } - return ret.build(); - } - - @Override - public Registration> registerNotificationListener(final Class notificationType, final NotificationListener listener) { - final GenericNotificationRegistration reg = new GenericNotificationRegistration(notificationType, listener, this); - this.listeners.put(notificationType, listener); - this.announceNotificationSubscription(notificationType); - return reg; } private void announceNotificationSubscription(final Class notification) { @@ -117,37 +66,62 @@ public class NotificationBrokerImpl implements NotificationProviderService, Auto } @Override - public Registration registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) { - final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener); - for (final Class notifyType : invoker.getSupportedNotifications()) { - listeners.put(notifyType, invoker.getInvocationProxy()); - announceNotificationSubscription(notifyType); + public ListenerRegistration registerInterestListener(final NotificationInterestListener interestListener) { + final ListenerRegistration registration = this.interestListeners.register(interestListener); + for (final Class notification : listeners.getKnownTypes()) { + interestListener.onNotificationSubscribtion(notification); } - - return new GeneratedListenerRegistration(listener, invoker, this); + return registration; } - protected boolean unregisterListener(final GenericNotificationRegistration reg) { - return listeners.remove(reg.getType(), reg.getInstance()); + @Override + public NotificationListenerRegistration registerNotificationListener(final Class notificationType, final NotificationListener listener) { + final NotificationListenerRegistration reg = new AbstractNotificationListenerRegistration(notificationType, listener) { + @Override + protected void removeRegistration() { + listeners.removeRegistrations(this); + } + }; + + addRegistrations(reg); + return reg; } - protected void unregisterListener(final GeneratedListenerRegistration reg) { - final NotificationInvoker invoker = reg.getInvoker(); - for (final Class notifyType : invoker.getSupportedNotifications()) { - this.listeners.remove(notifyType, invoker.getInvocationProxy()); + @Override + public ListenerRegistration registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) { + final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener); + final Set> types = invoker.getSupportedNotifications(); + final NotificationListenerRegistration[] regs = new NotificationListenerRegistration[types.size()]; + + // Populate the registrations... + int i = 0; + for (Class type : types) { + regs[i] = new AggregatedNotificationListenerRegistration(type, invoker.getInvocationProxy(), regs) { + @Override + protected void removeRegistration() { + // Nothing to do, will be cleaned up by parent (below) + } + }; + ++i; } + + // ... now put them to use ... + addRegistrations(regs); + + // ... finally return the parent registration + return new AbstractListenerRegistration(listener) { + @Override + protected void removeRegistration() { + listeners.removeRegistrations(regs); + for (ListenerRegistration reg : regs) { + reg.close(); + } + } + }; } @Override public void close() { } - @Override - public ListenerRegistration registerInterestListener(final NotificationInterestListener interestListener) { - final ListenerRegistration registration = this.interestListeners.register(interestListener); - for (final Class notification : listeners.keySet()) { - interestListener.onNotificationSubscribtion(notification); - } - return registration; - } }