*/
package org.opendaylight.controller.sal.binding.impl;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import org.eclipse.xtext.xbase.lib.Conversions;
-import org.eclipse.xtext.xbase.lib.Exceptions;
-import org.eclipse.xtext.xbase.lib.Functions.Function0;
-import org.eclipse.xtext.xbase.lib.Functions.Function1;
-import org.eclipse.xtext.xbase.lib.IterableExtensions;
import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Objects;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSet.Builder;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.SetMultimap;
+import com.google.common.base.Preconditions;
public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
- private final ListenerRegistry<NotificationInterestListener> interestListeners = new Function0<ListenerRegistry<NotificationInterestListener>>() {
- @Override
- public ListenerRegistry<NotificationInterestListener> apply() {
- ListenerRegistry<NotificationInterestListener> _create = ListenerRegistry.<NotificationInterestListener>create();
- return _create;
- }
- }.apply();
-
- private final Multimap<Class<? extends Notification>,NotificationListener<? extends Object>> listeners;
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
- private ExecutorService _executor;
+ private final ListenerRegistry<NotificationInterestListener> interestListeners =
+ ListenerRegistry.create();
+ private final GenerationalListenerMap listeners = new GenerationalListenerMap();
+ private final ExecutorService executor;
- public ExecutorService getExecutor() {
- return this._executor;
+ public NotificationBrokerImpl(final ExecutorService executor) {
+ this.executor = Preconditions.checkNotNull(executor);
}
- public void setExecutor(final ExecutorService executor) {
- this._executor = executor;
+ @Override
+ public void publish(final Notification notification) {
+ publish(notification, executor);
}
- private final Logger logger = new Function0<Logger>() {
- @Override
- public Logger apply() {
- Logger _logger = LoggerFactory.getLogger(NotificationBrokerImpl.class);
- return _logger;
+ @Override
+ public void publish(final Notification notification, final ExecutorService service) {
+ for (NotificationListenerRegistration<?> r : listeners.listenersFor(notification)) {
+ service.submit(new NotifyTask(r, notification));
}
- }.apply();
-
- public NotificationBrokerImpl() {
- HashMultimap<Class<? extends Notification>,NotificationListener<? extends Object>> _create = HashMultimap.<Class<? extends Notification>, NotificationListener<? extends Object>>create();
- SetMultimap<Class<? extends Notification>,NotificationListener<? extends Object>> _synchronizedSetMultimap = Multimaps.<Class<? extends Notification>, NotificationListener<? extends Object>>synchronizedSetMultimap(_create);
- this.listeners = _synchronizedSetMultimap;
}
- @Deprecated
- public NotificationBrokerImpl(final ExecutorService executor) {
- HashMultimap<Class<? extends Notification>,NotificationListener<? extends Object>> _create = HashMultimap.<Class<? extends Notification>, NotificationListener<? extends Object>>create();
- SetMultimap<Class<? extends Notification>,NotificationListener<? extends Object>> _synchronizedSetMultimap = Multimaps.<Class<? extends Notification>, NotificationListener<? extends Object>>synchronizedSetMultimap(_create);
- this.listeners = _synchronizedSetMultimap;
- this.setExecutor(executor);
+ private final void addRegistrations(final NotificationListenerRegistration<?>... registrations) {
+ listeners.addRegistrations(registrations);
+ for (NotificationListenerRegistration<?> reg : registrations) {
+ announceNotificationSubscription(reg.getType());
+ }
}
- public Iterable<Class<? extends Object>> getNotificationTypes(final Notification notification) {
- Class<? extends Notification> _class = notification.getClass();
- Class<? extends Object>[] _interfaces = _class.getInterfaces();
- final Function1<Class<? extends Object>,Boolean> _function = new Function1<Class<? extends Object>,Boolean>() {
- @Override
- public Boolean apply(final Class<? extends Object> it) {
- boolean _and = false;
- boolean _notEquals = (!Objects.equal(it, Notification.class));
- if (!_notEquals) {
- _and = false;
- } else {
- boolean _isAssignableFrom = Notification.class.isAssignableFrom(it);
- _and = (_notEquals && _isAssignableFrom);
- }
- return Boolean.valueOf(_and);
+ private void announceNotificationSubscription(final Class<? extends Notification> notification) {
+ for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
+ try {
+ listener.getInstance().onNotificationSubscribtion(notification);
+ } catch (Exception e) {
+ LOG.warn("Listener {} reported unexpected error on notification {}",
+ listener.getInstance(), notification, e);
}
- };
- Iterable<Class<? extends Object>> _filter = IterableExtensions.<Class<? extends Object>>filter(((Iterable<Class<? extends Object>>)Conversions.doWrapArray(_interfaces)), _function);
- return _filter;
+ }
}
@Override
- public void publish(final Notification notification) {
- ExecutorService _executor = this.getExecutor();
- this.publish(notification, _executor);
+ public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
+ final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
+ for (final Class<? extends Notification> notification : listeners.getKnownTypes()) {
+ interestListener.onNotificationSubscribtion(notification);
+ }
+ return registration;
}
@Override
- public void publish(final Notification notification, final ExecutorService service) {
- final Iterable<Class<? extends Object>> allTypes = this.getNotificationTypes(notification);
- Iterable<NotificationListener<? extends Object>> listenerToNotify = Collections.<NotificationListener<? extends Object>>emptySet();
- for (final Class<? extends Object> type : allTypes) {
- Collection<NotificationListener<? extends Object>> _get = this.listeners.get(((Class<? extends Notification>) type));
- Iterable<NotificationListener<? extends Object>> _plus = Iterables.<NotificationListener<? extends Object>>concat(listenerToNotify, _get);
- listenerToNotify = _plus;
- }
- final Function1<NotificationListener<? extends Object>,NotifyTask> _function = new Function1<NotificationListener<? extends Object>,NotifyTask>() {
+ public <T extends Notification> NotificationListenerRegistration<T> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
+ final NotificationListenerRegistration<T> reg = new AbstractNotificationListenerRegistration<T>(notificationType, listener) {
@Override
- public NotifyTask apply(final NotificationListener<? extends Object> it) {
- NotifyTask _notifyTask = new NotifyTask(it, notification);
- return _notifyTask;
+ protected void removeRegistration() {
+ listeners.removeRegistrations(this);
}
};
- Iterable<NotifyTask> _map = IterableExtensions.<NotificationListener<? extends Object>, NotifyTask>map(listenerToNotify, _function);
- final Set<NotifyTask> tasks = IterableExtensions.<NotifyTask>toSet(_map);
- ExecutorService _executor = this.getExecutor();
- this.submitAll(_executor, tasks);
- }
-
- public ImmutableSet<Future<Object>> submitAll(final ExecutorService service, final Set<NotifyTask> tasks) {
- final Builder<Future<Object>> ret = ImmutableSet.<Future<Object>>builder();
- for (final NotifyTask task : tasks) {
- Future<Object> _submit = service.<Object>submit(task);
- ret.add(_submit);
- }
- return ret.build();
- }
- @Override
- public <T extends Notification> Registration<NotificationListener<T>> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
- GenericNotificationRegistration<T> _genericNotificationRegistration = new GenericNotificationRegistration<T>(notificationType, listener, this);
- final GenericNotificationRegistration<T> reg = _genericNotificationRegistration;
- this.listeners.put(notificationType, listener);
- this.announceNotificationSubscription(notificationType);
+ addRegistrations(reg);
return reg;
}
- public void announceNotificationSubscription(final Class<? extends Notification> notification) {
- for (final ListenerRegistration<NotificationInterestListener> listener : this.interestListeners) {
- try {
- NotificationInterestListener _instance = listener.getInstance();
- _instance.onNotificationSubscribtion(notification);
- } catch (final Throwable _t) {
- if (_t instanceof Exception) {
- final Exception e = (Exception)_t;
- String _message = e.getMessage();
- this.logger.error("", _message);
- } else {
- throw Exceptions.sneakyThrow(_t);
- }
- }
- }
- }
-
@Override
- public Registration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
+ public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
- Set<Class<? extends Notification>> _supportedNotifications = invoker.getSupportedNotifications();
- for (final Class<? extends Notification> notifyType : _supportedNotifications) {
- {
- NotificationListener<Notification> _invocationProxy = invoker.getInvocationProxy();
- this.listeners.put(notifyType, _invocationProxy);
- this.announceNotificationSubscription(notifyType);
- }
+ final Set<Class<? extends Notification>> types = invoker.getSupportedNotifications();
+ final NotificationListenerRegistration<?>[] regs = new NotificationListenerRegistration<?>[types.size()];
+
+ // Populate the registrations...
+ int i = 0;
+ for (Class<? extends Notification> type : types) {
+ regs[i] = new AggregatedNotificationListenerRegistration<Notification, Object>(type, invoker.getInvocationProxy(), regs) {
+ @Override
+ protected void removeRegistration() {
+ // Nothing to do, will be cleaned up by parent (below)
+ }
+ };
+ ++i;
}
- GeneratedListenerRegistration _generatedListenerRegistration = new GeneratedListenerRegistration(listener, invoker, this);
- final GeneratedListenerRegistration registration = _generatedListenerRegistration;
- return (registration);
- }
- protected boolean unregisterListener(final GenericNotificationRegistration<? extends Object> reg) {
- Class<? extends Notification> _type = reg.getType();
- NotificationListener<? extends Notification> _instance = reg.getInstance();
- boolean _remove = this.listeners.remove(_type, _instance);
- return _remove;
- }
+ // ... now put them to use ...
+ addRegistrations(regs);
- protected void unregisterListener(final GeneratedListenerRegistration reg) {
- NotificationInvoker _invoker = reg.getInvoker();
- Set<Class<? extends Notification>> _supportedNotifications = _invoker.getSupportedNotifications();
- for (final Class<? extends Notification> notifyType : _supportedNotifications) {
- NotificationInvoker _invoker_1 = reg.getInvoker();
- NotificationListener<Notification> _invocationProxy = _invoker_1.getInvocationProxy();
- this.listeners.remove(notifyType, _invocationProxy);
- }
+ // ... finally return the parent registration
+ return new AbstractListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener>(listener) {
+ @Override
+ protected void removeRegistration() {
+ listeners.removeRegistrations(regs);
+ for (ListenerRegistration<?> reg : regs) {
+ reg.close();
+ }
+ }
+ };
}
@Override
public void close() {
}
- @Override
- public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
- final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
- Set<Class<? extends Notification>> _keySet = this.listeners.keySet();
- for (final Class<? extends Notification> notification : _keySet) {
- interestListener.onNotificationSubscribtion(notification);
- }
- return registration;
- }
}