X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FNotificationBrokerImpl.java;h=716670f5a38b2960a0c25887360744b149bfad6c;hp=9efd48eabdb9f6889bc45551f38247f37cf9d048;hb=51b43f12739605c4dce1c930f587f64256d90cf0;hpb=c59d59a9ef1fc10e09a85044bb1bd208ff3219f2 diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java index 9efd48eabd..716670f5a3 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java @@ -7,82 +7,54 @@ */ package org.opendaylight.controller.sal.binding.impl; +import java.util.Arrays; import java.util.Collection; -import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import org.eclipse.xtext.xbase.lib.Conversions; -import org.eclipse.xtext.xbase.lib.Functions.Function1; -import org.eclipse.xtext.xbase.lib.IterableExtensions; import org.opendaylight.controller.sal.binding.api.NotificationListener; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.concepts.util.ListenerRegistry; import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; -import com.google.common.collect.SetMultimap; public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class); private final ListenerRegistry interestListeners = ListenerRegistry.create(); + private final Multimap, NotificationListenerRegistration> listeners = + Multimaps.synchronizedSetMultimap(HashMultimap., NotificationListenerRegistration>create()); + private final ExecutorService executor; - private final Multimap,NotificationListener> listeners; - private ExecutorService executor; - - public NotificationBrokerImpl() { - HashMultimap,NotificationListener> _create = HashMultimap., NotificationListener>create(); - SetMultimap,NotificationListener> _synchronizedSetMultimap = Multimaps., NotificationListener>synchronizedSetMultimap(_create); - this.listeners = _synchronizedSetMultimap; - } - - @Deprecated public NotificationBrokerImpl(final ExecutorService executor) { - HashMultimap,NotificationListener> _create = HashMultimap., NotificationListener>create(); - SetMultimap,NotificationListener> _synchronizedSetMultimap = Multimaps., NotificationListener>synchronizedSetMultimap(_create); - this.listeners = _synchronizedSetMultimap; - this.setExecutor(executor); - } - - public void setExecutor(final ExecutorService executor) { this.executor = Preconditions.checkNotNull(executor); } - public Iterable> getNotificationTypes(final Notification notification) { - Class _class = notification.getClass(); - Class[] _interfaces = _class.getInterfaces(); - final Function1,Boolean> _function = new Function1,Boolean>() { + public Iterable> getNotificationTypes(final Notification notification) { + final Class[] ifaces = notification.getClass().getInterfaces(); + return Iterables.filter(Arrays.asList(ifaces), new Predicate>() { @Override - public Boolean apply(final Class it) { - boolean _and = false; - boolean _notEquals = (!Objects.equal(it, Notification.class)); - if (!_notEquals) { - _and = false; - } else { - boolean _isAssignableFrom = Notification.class.isAssignableFrom(it); - _and = (_notEquals && _isAssignableFrom); + public boolean apply(final Class input) { + if (Notification.class.equals(input)) { + return false; } - return Boolean.valueOf(_and); + return Notification.class.isAssignableFrom(input); } - }; - Iterable> _filter = IterableExtensions.>filter(((Iterable>)Conversions.doWrapArray(_interfaces)), _function); - return _filter; + }); } @Override @@ -92,40 +64,43 @@ public class NotificationBrokerImpl implements NotificationProviderService, Auto @Override public void publish(final Notification notification, final ExecutorService service) { - final Iterable> allTypes = this.getNotificationTypes(notification); - Iterable> listenerToNotify = Collections.>emptySet(); - for (final Class type : allTypes) { - Collection> _get = this.listeners.get(((Class) type)); - Iterable> _plus = Iterables.>concat(listenerToNotify, _get); - listenerToNotify = _plus; - } - final Function1,NotifyTask> _function = new Function1,NotifyTask>() { - @Override - public NotifyTask apply(final NotificationListener it) { - NotifyTask _notifyTask = new NotifyTask(it, notification); - return _notifyTask; + final Set> toNotify = new HashSet<>(); + + for (final Class type : getNotificationTypes(notification)) { + final Collection> l = listeners.get((Class) type); + if (l != null) { + toNotify.addAll(l); } - }; - Iterable _map = IterableExtensions., NotifyTask>map(listenerToNotify, _function); - final Set tasks = IterableExtensions.toSet(_map); - this.submitAll(executor, tasks); + } + + for (NotificationListenerRegistration r : toNotify) { + service.submit(new NotifyTask(r, notification)); + } } - public ImmutableSet> submitAll(final ExecutorService service, final Set tasks) { - final Builder> ret = ImmutableSet.>builder(); - for (final NotifyTask task : tasks) { - Future _submit = service.submit(task); - ret.add(_submit); + private void addRegistrations(final NotificationListenerRegistration... registrations) { + for (NotificationListenerRegistration reg : registrations) { + listeners.put(reg.getType(), reg); + this.announceNotificationSubscription(reg.getType()); + } + } + + void removeRegistrations(final NotificationListenerRegistration... registrations) { + for (NotificationListenerRegistration reg : registrations) { + listeners.remove(reg.getType(), reg); } - return ret.build(); } @Override - public Registration> registerNotificationListener(final Class notificationType, final NotificationListener listener) { - GenericNotificationRegistration _genericNotificationRegistration = new GenericNotificationRegistration(notificationType, listener, this); - final GenericNotificationRegistration reg = _genericNotificationRegistration; - this.listeners.put(notificationType, listener); - this.announceNotificationSubscription(notificationType); + public NotificationListenerRegistration registerNotificationListener(final Class notificationType, final NotificationListener listener) { + final NotificationListenerRegistration reg = new AbstractNotificationListenerRegistration(notificationType, listener) { + @Override + protected void removeRegistration() { + removeRegistrations(this); + } + }; + + addRegistrations(reg); return reg; } @@ -141,36 +116,36 @@ public class NotificationBrokerImpl implements NotificationProviderService, Auto } @Override - public Registration registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) { + public ListenerRegistration registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) { final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener); - Set> _supportedNotifications = invoker.getSupportedNotifications(); - for (final Class notifyType : _supportedNotifications) { - { - NotificationListener _invocationProxy = invoker.getInvocationProxy(); - this.listeners.put(notifyType, _invocationProxy); - this.announceNotificationSubscription(notifyType); - } + final Set> types = invoker.getSupportedNotifications(); + final NotificationListenerRegistration[] regs = new NotificationListenerRegistration[types.size()]; + + // Populate the registrations... + int i = 0; + for (Class type : types) { + regs[i] = new AggregatedNotificationListenerRegistration(type, invoker.getInvocationProxy(), regs) { + @Override + protected void removeRegistration() { + // Nothing to do, will be cleaned up by parent (below) + } + }; + ++i; } - GeneratedListenerRegistration _generatedListenerRegistration = new GeneratedListenerRegistration(listener, invoker, this); - final GeneratedListenerRegistration registration = _generatedListenerRegistration; - return (registration); - } - protected boolean unregisterListener(final GenericNotificationRegistration reg) { - Class _type = reg.getType(); - NotificationListener _instance = reg.getInstance(); - boolean _remove = this.listeners.remove(_type, _instance); - return _remove; - } + // ... now put them to use ... + addRegistrations(regs); - protected void unregisterListener(final GeneratedListenerRegistration reg) { - NotificationInvoker _invoker = reg.getInvoker(); - Set> _supportedNotifications = _invoker.getSupportedNotifications(); - for (final Class notifyType : _supportedNotifications) { - NotificationInvoker _invoker_1 = reg.getInvoker(); - NotificationListener _invocationProxy = _invoker_1.getInvocationProxy(); - this.listeners.remove(notifyType, _invocationProxy); - } + // ... finally return the parent registration + return new AbstractListenerRegistration(listener) { + @Override + protected void removeRegistration() { + removeRegistrations(regs); + for (ListenerRegistration reg : regs) { + reg.close(); + } + } + }; } @Override @@ -180,8 +155,7 @@ public class NotificationBrokerImpl implements NotificationProviderService, Auto @Override public ListenerRegistration registerInterestListener(final NotificationInterestListener interestListener) { final ListenerRegistration registration = this.interestListeners.register(interestListener); - Set> _keySet = this.listeners.keySet(); - for (final Class notification : _keySet) { + for (final Class notification : listeners.keySet()) { interestListener.onNotificationSubscribtion(notification); } return registration;