2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.binding.impl;
10 import java.util.Collections;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.Future;
15 import org.eclipse.xtext.xbase.lib.Conversions;
16 import org.eclipse.xtext.xbase.lib.Functions.Function1;
17 import org.eclipse.xtext.xbase.lib.IterableExtensions;
18 import org.opendaylight.controller.sal.binding.api.NotificationListener;
19 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
20 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
21 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
22 import org.opendaylight.yangtools.concepts.ListenerRegistration;
23 import org.opendaylight.yangtools.concepts.Registration;
24 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
25 import org.opendaylight.yangtools.yang.binding.Notification;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 import com.google.common.base.Preconditions;
30 import com.google.common.collect.HashMultimap;
31 import com.google.common.collect.ImmutableSet;
32 import com.google.common.collect.ImmutableSet.Builder;
33 import com.google.common.collect.Iterables;
34 import com.google.common.collect.Multimap;
35 import com.google.common.collect.Multimaps;
37 public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
38 private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
40 private final ListenerRegistry<NotificationInterestListener> interestListeners =
41 ListenerRegistry.create();
43 private final Multimap<Class<? extends Notification>, NotificationListener<?>> listeners =
44 Multimaps.synchronizedSetMultimap(HashMultimap.<Class<? extends Notification>, NotificationListener<?>>create());
45 private ExecutorService executor;
48 public NotificationBrokerImpl(final ExecutorService executor) {
49 this.setExecutor(executor);
52 public void setExecutor(final ExecutorService executor) {
53 this.executor = Preconditions.checkNotNull(executor);
56 public Iterable<Class<?>> getNotificationTypes(final Notification notification) {
57 Class<?>[] _interfaces = notification.getClass().getInterfaces();
58 final Function1<Class<?>, Boolean> _function = new Function1<Class<?>, Boolean>() {
60 public Boolean apply(final Class<?> it) {
61 if (Notification.class.equals(it)) {
64 return Notification.class.isAssignableFrom(it);
67 return IterableExtensions.filter(((Iterable<Class<?>>)Conversions.doWrapArray(_interfaces)), _function);
71 public void publish(final Notification notification) {
72 this.publish(notification, executor);
76 public void publish(final Notification notification, final ExecutorService service) {
77 Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
78 for (final Class<?> type : getNotificationTypes(notification)) {
79 listenerToNotify = Iterables.concat(listenerToNotify, listeners.get(((Class<? extends Notification>) type)));
81 final Function1<NotificationListener<?>,NotifyTask> _function = new Function1<NotificationListener<?>, NotifyTask>() {
83 public NotifyTask apply(final NotificationListener<?> it) {
84 return new NotifyTask(it, notification);
87 final Set<NotifyTask> tasks = IterableExtensions.<NotifyTask>toSet(
88 IterableExtensions.<NotificationListener<?>, NotifyTask>map(listenerToNotify, _function));
89 this.submitAll(executor, tasks);
92 private ImmutableSet<Future<Object>> submitAll(final ExecutorService service, final Set<NotifyTask> tasks) {
93 final Builder<Future<Object>> ret = ImmutableSet.<Future<Object>>builder();
94 for (final NotifyTask task : tasks) {
95 ret.add(service.submit(task));
101 public <T extends Notification> Registration<NotificationListener<T>> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
102 final GenericNotificationRegistration<T> reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
103 this.listeners.put(notificationType, listener);
104 this.announceNotificationSubscription(notificationType);
108 private void announceNotificationSubscription(final Class<? extends Notification> notification) {
109 for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
111 listener.getInstance().onNotificationSubscribtion(notification);
112 } catch (Exception e) {
113 LOG.warn("Listener {} reported unexpected error on notification {}",
114 listener.getInstance(), notification, e);
120 public Registration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
121 final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
122 for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
123 listeners.put(notifyType, invoker.getInvocationProxy());
124 announceNotificationSubscription(notifyType);
127 return new GeneratedListenerRegistration(listener, invoker, this);
130 protected boolean unregisterListener(final GenericNotificationRegistration<?> reg) {
131 return listeners.remove(reg.getType(), reg.getInstance());
134 protected void unregisterListener(final GeneratedListenerRegistration reg) {
135 final NotificationInvoker invoker = reg.getInvoker();
136 for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
137 this.listeners.remove(notifyType, invoker.getInvocationProxy());
142 public void close() {
146 public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
147 final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
148 for (final Class<? extends Notification> notification : listeners.keySet()) {
149 interestListener.onNotificationSubscribtion(notification);