X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-binding-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fbinding%2Fimpl%2FNotificationBrokerImpl.java;h=fd0b263f8e51cc1028571906cb54da98b727f5f7;hp=6de878f85138cb7b2a6e5eee02ac3c1659d73db3;hb=67ee036916da9ce71eb8303c2f13b52d7e6c98b5;hpb=357ebddddbb29d3ed35685516dc184e0603e2def diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java index 6de878f851..fd0b263f8e 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.java @@ -7,208 +7,121 @@ */ package org.opendaylight.controller.sal.binding.impl; -import java.util.Collection; -import java.util.Collections; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import org.eclipse.xtext.xbase.lib.Conversions; -import org.eclipse.xtext.xbase.lib.Exceptions; -import org.eclipse.xtext.xbase.lib.Functions.Function0; -import org.eclipse.xtext.xbase.lib.Functions.Function1; -import org.eclipse.xtext.xbase.lib.IterableExtensions; import org.opendaylight.controller.sal.binding.api.NotificationListener; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker; +import org.opendaylight.yangtools.concepts.AbstractListenerRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.concepts.util.ListenerRegistry; import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Objects; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSet.Builder; -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; -import com.google.common.collect.SetMultimap; +import com.google.common.base.Preconditions; public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { - private final ListenerRegistry interestListeners = new Function0>() { - @Override - public ListenerRegistry apply() { - ListenerRegistry _create = ListenerRegistry.create(); - return _create; - } - }.apply(); - - private final Multimap,NotificationListener> listeners; + private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class); - private ExecutorService _executor; + private final ListenerRegistry interestListeners = + ListenerRegistry.create(); + private final GenerationalListenerMap listeners = new GenerationalListenerMap(); + private final ExecutorService executor; - public ExecutorService getExecutor() { - return this._executor; + public NotificationBrokerImpl(final ExecutorService executor) { + this.executor = Preconditions.checkNotNull(executor); } - public void setExecutor(final ExecutorService executor) { - this._executor = executor; + @Override + public void publish(final Notification notification) { + publish(notification, executor); } - private final Logger logger = new Function0() { - @Override - public Logger apply() { - Logger _logger = LoggerFactory.getLogger(NotificationBrokerImpl.class); - return _logger; + @Override + public void publish(final Notification notification, final ExecutorService service) { + for (NotificationListenerRegistration r : listeners.listenersFor(notification)) { + service.submit(new NotifyTask(r, notification)); } - }.apply(); - - public NotificationBrokerImpl() { - HashMultimap,NotificationListener> _create = HashMultimap., NotificationListener>create(); - SetMultimap,NotificationListener> _synchronizedSetMultimap = Multimaps., NotificationListener>synchronizedSetMultimap(_create); - this.listeners = _synchronizedSetMultimap; } - @Deprecated - public NotificationBrokerImpl(final ExecutorService executor) { - HashMultimap,NotificationListener> _create = HashMultimap., NotificationListener>create(); - SetMultimap,NotificationListener> _synchronizedSetMultimap = Multimaps., NotificationListener>synchronizedSetMultimap(_create); - this.listeners = _synchronizedSetMultimap; - this.setExecutor(executor); + private final void addRegistrations(final NotificationListenerRegistration... registrations) { + listeners.addRegistrations(registrations); + for (NotificationListenerRegistration reg : registrations) { + announceNotificationSubscription(reg.getType()); + } } - public Iterable> getNotificationTypes(final Notification notification) { - Class _class = notification.getClass(); - Class[] _interfaces = _class.getInterfaces(); - final Function1,Boolean> _function = new Function1,Boolean>() { - @Override - public Boolean apply(final Class it) { - boolean _and = false; - boolean _notEquals = (!Objects.equal(it, Notification.class)); - if (!_notEquals) { - _and = false; - } else { - boolean _isAssignableFrom = Notification.class.isAssignableFrom(it); - _and = (_notEquals && _isAssignableFrom); - } - return Boolean.valueOf(_and); + private void announceNotificationSubscription(final Class notification) { + for (final ListenerRegistration listener : interestListeners) { + try { + listener.getInstance().onNotificationSubscribtion(notification); + } catch (Exception e) { + LOG.warn("Listener {} reported unexpected error on notification {}", + listener.getInstance(), notification, e); } - }; - Iterable> _filter = IterableExtensions.>filter(((Iterable>)Conversions.doWrapArray(_interfaces)), _function); - return _filter; + } } @Override - public void publish(final Notification notification) { - ExecutorService _executor = this.getExecutor(); - this.publish(notification, _executor); + public ListenerRegistration registerInterestListener(final NotificationInterestListener interestListener) { + final ListenerRegistration registration = this.interestListeners.register(interestListener); + for (final Class notification : listeners.getKnownTypes()) { + interestListener.onNotificationSubscribtion(notification); + } + return registration; } @Override - public void publish(final Notification notification, final ExecutorService service) { - final Iterable> allTypes = this.getNotificationTypes(notification); - Iterable> listenerToNotify = Collections.>emptySet(); - for (final Class type : allTypes) { - Collection> _get = this.listeners.get(((Class) type)); - Iterable> _plus = Iterables.>concat(listenerToNotify, _get); - listenerToNotify = _plus; - } - final Function1,NotifyTask> _function = new Function1,NotifyTask>() { + public NotificationListenerRegistration registerNotificationListener(final Class notificationType, final NotificationListener listener) { + final NotificationListenerRegistration reg = new AbstractNotificationListenerRegistration(notificationType, listener) { @Override - public NotifyTask apply(final NotificationListener it) { - NotifyTask _notifyTask = new NotifyTask(it, notification); - return _notifyTask; + protected void removeRegistration() { + listeners.removeRegistrations(this); } }; - Iterable _map = IterableExtensions., NotifyTask>map(listenerToNotify, _function); - final Set tasks = IterableExtensions.toSet(_map); - ExecutorService _executor = this.getExecutor(); - this.submitAll(_executor, tasks); - } - - public ImmutableSet> submitAll(final ExecutorService service, final Set tasks) { - final Builder> ret = ImmutableSet.>builder(); - for (final NotifyTask task : tasks) { - Future _submit = service.submit(task); - ret.add(_submit); - } - return ret.build(); - } - @Override - public Registration> registerNotificationListener(final Class notificationType, final NotificationListener listener) { - GenericNotificationRegistration _genericNotificationRegistration = new GenericNotificationRegistration(notificationType, listener, this); - final GenericNotificationRegistration reg = _genericNotificationRegistration; - this.listeners.put(notificationType, listener); - this.announceNotificationSubscription(notificationType); + addRegistrations(reg); return reg; } - public void announceNotificationSubscription(final Class notification) { - for (final ListenerRegistration listener : this.interestListeners) { - try { - NotificationInterestListener _instance = listener.getInstance(); - _instance.onNotificationSubscribtion(notification); - } catch (final Throwable _t) { - if (_t instanceof Exception) { - final Exception e = (Exception)_t; - String _message = e.getMessage(); - this.logger.error("", _message); - } else { - throw Exceptions.sneakyThrow(_t); - } - } - } - } - @Override - public Registration registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) { + public ListenerRegistration registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) { final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener); - Set> _supportedNotifications = invoker.getSupportedNotifications(); - for (final Class notifyType : _supportedNotifications) { - { - NotificationListener _invocationProxy = invoker.getInvocationProxy(); - this.listeners.put(notifyType, _invocationProxy); - this.announceNotificationSubscription(notifyType); - } + final Set> types = invoker.getSupportedNotifications(); + final NotificationListenerRegistration[] regs = new NotificationListenerRegistration[types.size()]; + + // Populate the registrations... + int i = 0; + for (Class type : types) { + regs[i] = new AggregatedNotificationListenerRegistration(type, invoker.getInvocationProxy(), regs) { + @Override + protected void removeRegistration() { + // Nothing to do, will be cleaned up by parent (below) + } + }; + ++i; } - GeneratedListenerRegistration _generatedListenerRegistration = new GeneratedListenerRegistration(listener, invoker, this); - final GeneratedListenerRegistration registration = _generatedListenerRegistration; - return (registration); - } - protected boolean unregisterListener(final GenericNotificationRegistration reg) { - Class _type = reg.getType(); - NotificationListener _instance = reg.getInstance(); - boolean _remove = this.listeners.remove(_type, _instance); - return _remove; - } + // ... now put them to use ... + addRegistrations(regs); - protected void unregisterListener(final GeneratedListenerRegistration reg) { - NotificationInvoker _invoker = reg.getInvoker(); - Set> _supportedNotifications = _invoker.getSupportedNotifications(); - for (final Class notifyType : _supportedNotifications) { - NotificationInvoker _invoker_1 = reg.getInvoker(); - NotificationListener _invocationProxy = _invoker_1.getInvocationProxy(); - this.listeners.remove(notifyType, _invocationProxy); - } + // ... finally return the parent registration + return new AbstractListenerRegistration(listener) { + @Override + protected void removeRegistration() { + listeners.removeRegistrations(regs); + for (ListenerRegistration reg : regs) { + reg.close(); + } + } + }; } @Override public void close() { } - @Override - public ListenerRegistration registerInterestListener(final NotificationInterestListener interestListener) { - final ListenerRegistration registration = this.interestListeners.register(interestListener); - Set> _keySet = this.listeners.keySet(); - for (final Class notification : _keySet) { - interestListener.onNotificationSubscribtion(notification); - } - return registration; - } }