/** * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.sal.binding.impl; import java.util.Collection; import java.util.Collections; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.eclipse.xtext.xbase.lib.Conversions; import org.eclipse.xtext.xbase.lib.Functions.Function1; import org.eclipse.xtext.xbase.lib.IterableExtensions; import org.opendaylight.controller.sal.binding.api.NotificationListener; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.concepts.util.ListenerRegistry; import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class); private final ListenerRegistry interestListeners = ListenerRegistry.create(); private final Multimap,NotificationListener> listeners; private ExecutorService executor; public NotificationBrokerImpl() { HashMultimap,NotificationListener> _create = HashMultimap., NotificationListener>create(); SetMultimap,NotificationListener> _synchronizedSetMultimap = Multimaps., NotificationListener>synchronizedSetMultimap(_create); this.listeners = _synchronizedSetMultimap; } @Deprecated public NotificationBrokerImpl(final ExecutorService executor) { HashMultimap,NotificationListener> _create = HashMultimap., NotificationListener>create(); SetMultimap,NotificationListener> _synchronizedSetMultimap = Multimaps., NotificationListener>synchronizedSetMultimap(_create); this.listeners = _synchronizedSetMultimap; this.setExecutor(executor); } public void setExecutor(final ExecutorService executor) { this.executor = Preconditions.checkNotNull(executor); } public Iterable> getNotificationTypes(final Notification notification) { Class _class = notification.getClass(); Class[] _interfaces = _class.getInterfaces(); final Function1,Boolean> _function = new Function1,Boolean>() { @Override public Boolean apply(final Class it) { boolean _and = false; boolean _notEquals = (!Objects.equal(it, Notification.class)); if (!_notEquals) { _and = false; } else { boolean _isAssignableFrom = Notification.class.isAssignableFrom(it); _and = (_notEquals && _isAssignableFrom); } return Boolean.valueOf(_and); } }; Iterable> _filter = IterableExtensions.>filter(((Iterable>)Conversions.doWrapArray(_interfaces)), _function); return _filter; } @Override public void publish(final Notification notification) { this.publish(notification, executor); } @Override public void publish(final Notification notification, final ExecutorService service) { final Iterable> allTypes = this.getNotificationTypes(notification); Iterable> listenerToNotify = Collections.>emptySet(); for (final Class type : allTypes) { Collection> _get = this.listeners.get(((Class) type)); Iterable> _plus = Iterables.>concat(listenerToNotify, _get); listenerToNotify = _plus; } final Function1,NotifyTask> _function = new Function1,NotifyTask>() { @Override public NotifyTask apply(final NotificationListener it) { NotifyTask _notifyTask = new NotifyTask(it, notification); return _notifyTask; } }; Iterable _map = IterableExtensions., NotifyTask>map(listenerToNotify, _function); final Set tasks = IterableExtensions.toSet(_map); this.submitAll(executor, tasks); } public ImmutableSet> submitAll(final ExecutorService service, final Set tasks) { final Builder> ret = ImmutableSet.>builder(); for (final NotifyTask task : tasks) { Future _submit = service.submit(task); ret.add(_submit); } return ret.build(); } @Override public Registration> registerNotificationListener(final Class notificationType, final NotificationListener listener) { GenericNotificationRegistration _genericNotificationRegistration = new GenericNotificationRegistration(notificationType, listener, this); final GenericNotificationRegistration reg = _genericNotificationRegistration; this.listeners.put(notificationType, listener); this.announceNotificationSubscription(notificationType); return reg; } private void announceNotificationSubscription(final Class notification) { for (final ListenerRegistration listener : interestListeners) { try { listener.getInstance().onNotificationSubscribtion(notification); } catch (Exception e) { LOG.warn("Listener {} reported unexpected error on notification {}", listener.getInstance(), notification, e); } } } @Override public Registration registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) { final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener); Set> _supportedNotifications = invoker.getSupportedNotifications(); for (final Class notifyType : _supportedNotifications) { { NotificationListener _invocationProxy = invoker.getInvocationProxy(); this.listeners.put(notifyType, _invocationProxy); this.announceNotificationSubscription(notifyType); } } GeneratedListenerRegistration _generatedListenerRegistration = new GeneratedListenerRegistration(listener, invoker, this); final GeneratedListenerRegistration registration = _generatedListenerRegistration; return (registration); } protected boolean unregisterListener(final GenericNotificationRegistration reg) { Class _type = reg.getType(); NotificationListener _instance = reg.getInstance(); boolean _remove = this.listeners.remove(_type, _instance); return _remove; } protected void unregisterListener(final GeneratedListenerRegistration reg) { NotificationInvoker _invoker = reg.getInvoker(); Set> _supportedNotifications = _invoker.getSupportedNotifications(); for (final Class notifyType : _supportedNotifications) { NotificationInvoker _invoker_1 = reg.getInvoker(); NotificationListener _invocationProxy = _invoker_1.getInvocationProxy(); this.listeners.remove(notifyType, _invocationProxy); } } @Override public void close() { } @Override public ListenerRegistration registerInterestListener(final NotificationInterestListener interestListener) { final ListenerRegistration registration = this.interestListeners.register(interestListener); Set> _keySet = this.listeners.keySet(); for (final Class notification : _keySet) { interestListener.onNotificationSubscribtion(notification); } return registration; } }