2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
\r
4 * This program and the accompanying materials are made available under the
\r
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
\r
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
\r
8 package org.opendaylight.controller.sal.binding.impl
\r
10 import com.google.common.collect.HashMultimap
\r
11 import com.google.common.collect.ImmutableSet
\r
12 import com.google.common.collect.Multimap
\r
13 import com.google.common.collect.Multimaps
\r
14 import java.util.Collections
\r
15 import java.util.concurrent.Callable
\r
16 import java.util.concurrent.ExecutorService
\r
17 import java.util.concurrent.Future
\r
18 import java.util.Set
\r
19 import org.opendaylight.controller.sal.binding.api.NotificationListener
\r
20 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
\r
21 import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener
\r
22 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder
\r
23 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker
\r
24 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
\r
25 import org.opendaylight.yangtools.concepts.ListenerRegistration
\r
26 import org.opendaylight.yangtools.concepts.Registration
\r
27 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
\r
28 import org.opendaylight.yangtools.yang.binding.Notification
\r
29 import org.slf4j.LoggerFactory
\r
31 class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
\r
33 val ListenerRegistry<NotificationInterestListener> interestListeners = ListenerRegistry.create;
\r
35 val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;
\r
38 var ExecutorService executor;
\r
40 val logger = LoggerFactory.getLogger(NotificationBrokerImpl)
\r
43 listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())
\r
47 new(ExecutorService executor) {
\r
48 listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())
\r
49 this.executor = executor;
\r
53 override <T extends Notification> addNotificationListener(Class<T> notificationType,
\r
54 NotificationListener<T> listener) {
\r
55 listeners.put(notificationType, listener)
\r
59 override <T extends Notification> removeNotificationListener(Class<T> notificationType,
\r
60 NotificationListener<T> listener) {
\r
61 listeners.remove(notificationType, listener)
\r
64 override notify(Notification notification) {
\r
65 publish(notification)
\r
68 def getNotificationTypes(Notification notification) {
\r
69 notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]
\r
73 override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
\r
74 throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");
\r
79 override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
\r
80 throw new UnsupportedOperationException(
\r
81 "Deprecated method. Use RegisterNotificationListener returned value to close registration.")
\r
85 override notify(Notification notification, ExecutorService service) {
\r
86 publish(notification, service)
\r
89 override publish(Notification notification) {
\r
90 publish(notification, executor)
\r
93 override publish(Notification notification, ExecutorService service) {
\r
94 val allTypes = notification.notificationTypes
\r
96 var Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
\r
97 for (type : allTypes) {
\r
98 listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)
\r
100 val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;
\r
101 submitAll(executor,tasks);
\r
104 def submitAll(ExecutorService service, Set<NotifyTask> tasks) {
\r
105 val ret = ImmutableSet.<Future<Object>>builder();
\r
106 for(task : tasks) {
\r
107 ret.add(service.submit(task));
\r
109 return ret.build();
\r
112 override <T extends Notification> registerNotificationListener(Class<T> notificationType,
\r
113 NotificationListener<T> listener) {
\r
114 val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
\r
115 listeners.put(notificationType, listener);
\r
116 announceNotificationSubscription(notificationType);
\r
120 def announceNotificationSubscription(Class<? extends Notification> notification) {
\r
121 for (listener : interestListeners) {
\r
123 listener.instance.onNotificationSubscribtion(notification);
\r
124 } catch (Exception e) {
\r
125 logger.error("", e.message)
\r
130 override registerNotificationListener(
\r
131 org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
\r
132 val invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
\r
133 for (notifyType : invoker.supportedNotifications) {
\r
134 listeners.put(notifyType, invoker.invocationProxy)
\r
135 announceNotificationSubscription(notifyType)
\r
137 val registration = new GeneratedListenerRegistration(listener, invoker,this);
\r
138 return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;
\r
141 protected def unregisterListener(GenericNotificationRegistration<?> reg) {
\r
142 listeners.remove(reg.type, reg.instance);
\r
145 protected def unregisterListener(GeneratedListenerRegistration reg) {
\r
146 for (notifyType : reg.invoker.supportedNotifications) {
\r
147 listeners.remove(notifyType, reg.invoker.invocationProxy)
\r
152 //FIXME: implement properly.
\r
155 override registerInterestListener(NotificationInterestListener interestListener) {
\r
156 val registration = interestListeners.register(interestListener);
\r
158 for(notification : listeners.keySet) {
\r
159 interestListener.onNotificationSubscribtion(notification);
\r
161 return registration
\r
165 class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
\r
170 var NotificationBrokerImpl notificationBroker;
\r
172 public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {
\r
175 notificationBroker = broker;
\r
178 override protected removeRegistration() {
\r
179 notificationBroker.unregisterListener(this);
\r
180 notificationBroker = null;
\r
184 class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {
\r
187 val NotificationInvoker invoker;
\r
189 var NotificationBrokerImpl notificationBroker;
\r
192 new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {
\r
194 _invoker = invoker;
\r
195 notificationBroker = broker;
\r
198 override protected removeRegistration() {
\r
199 notificationBroker.unregisterListener(this);
\r
200 notificationBroker = null;
\r
206 class NotifyTask implements Callable<Object> {
\r
208 private static val log = LoggerFactory.getLogger(NotifyTask);
\r
210 @SuppressWarnings("rawtypes")
\r
211 val NotificationListener listener;
\r
212 val Notification notification;
\r
215 //Only logging the complete notification in debug mode
\r
217 if(log.isDebugEnabled){
\r
218 log.debug("Delivering notification {} to {}",notification,listener);
\r
220 log.trace("Delivering notification {} to {}",notification.class.name,listener);
\r
222 listener.onNotification(notification);
\r
223 if(log.isDebugEnabled){
\r
224 log.debug("Notification delivered {} to {}",notification,listener);
\r
226 log.trace("Notification delivered {} to {}",notification.class.name,listener);
\r
228 } catch (Exception e) {
\r
229 log.error("Unhandled exception thrown by listener: {}", listener, e);
\r