/** * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.sal.binding.impl; import java.util.Collections; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.eclipse.xtext.xbase.lib.Conversions; import org.eclipse.xtext.xbase.lib.Functions.Function1; import org.eclipse.xtext.xbase.lib.IterableExtensions; import org.opendaylight.controller.sal.binding.api.NotificationListener; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.concepts.util.ListenerRegistry; import org.opendaylight.yangtools.yang.binding.Notification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class); private final ListenerRegistry interestListeners = ListenerRegistry.create(); private final Multimap, NotificationListener> listeners = Multimaps.synchronizedSetMultimap(HashMultimap., NotificationListener>create()); private ExecutorService executor; @Deprecated public NotificationBrokerImpl(final ExecutorService executor) { this.setExecutor(executor); } public void setExecutor(final ExecutorService executor) { this.executor = Preconditions.checkNotNull(executor); } public Iterable> getNotificationTypes(final Notification notification) { Class[] _interfaces = notification.getClass().getInterfaces(); final Function1, Boolean> _function = new Function1, Boolean>() { @Override public Boolean apply(final Class it) { if (Notification.class.equals(it)) { return false; } return Notification.class.isAssignableFrom(it); } }; return IterableExtensions.filter(((Iterable>)Conversions.doWrapArray(_interfaces)), _function); } @Override public void publish(final Notification notification) { this.publish(notification, executor); } @Override public void publish(final Notification notification, final ExecutorService service) { Iterable> listenerToNotify = Collections.emptySet(); for (final Class type : getNotificationTypes(notification)) { listenerToNotify = Iterables.concat(listenerToNotify, listeners.get(((Class) type))); } final Function1,NotifyTask> _function = new Function1, NotifyTask>() { @Override public NotifyTask apply(final NotificationListener it) { return new NotifyTask(it, notification); } }; final Set tasks = IterableExtensions.toSet( IterableExtensions., NotifyTask>map(listenerToNotify, _function)); this.submitAll(executor, tasks); } private ImmutableSet> submitAll(final ExecutorService service, final Set tasks) { final Builder> ret = ImmutableSet.>builder(); for (final NotifyTask task : tasks) { ret.add(service.submit(task)); } return ret.build(); } @Override public Registration> registerNotificationListener(final Class notificationType, final NotificationListener listener) { final GenericNotificationRegistration reg = new GenericNotificationRegistration(notificationType, listener, this); this.listeners.put(notificationType, listener); this.announceNotificationSubscription(notificationType); return reg; } private void announceNotificationSubscription(final Class notification) { for (final ListenerRegistration listener : interestListeners) { try { listener.getInstance().onNotificationSubscribtion(notification); } catch (Exception e) { LOG.warn("Listener {} reported unexpected error on notification {}", listener.getInstance(), notification, e); } } } @Override public Registration registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) { final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener); for (final Class notifyType : invoker.getSupportedNotifications()) { listeners.put(notifyType, invoker.getInvocationProxy()); announceNotificationSubscription(notifyType); } return new GeneratedListenerRegistration(listener, invoker, this); } protected boolean unregisterListener(final GenericNotificationRegistration reg) { return listeners.remove(reg.getType(), reg.getInstance()); } protected void unregisterListener(final GeneratedListenerRegistration reg) { final NotificationInvoker invoker = reg.getInvoker(); for (final Class notifyType : invoker.getSupportedNotifications()) { this.listeners.remove(notifyType, invoker.getInvocationProxy()); } } @Override public void close() { } @Override public ListenerRegistration registerInterestListener(final NotificationInterestListener interestListener) { final ListenerRegistration registration = this.interestListeners.register(interestListener); for (final Class notification : listeners.keySet()) { interestListener.onNotificationSubscribtion(notification); } return registration; } }