2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
\r
4 * This program and the accompanying materials are made available under the
\r
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
\r
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
\r
8 package org.opendaylight.controller.sal.binding.impl
\r
10 import com.google.common.collect.HashMultimap
\r
11 import com.google.common.collect.Multimap
\r
12 import java.util.Collection
\r
13 import java.util.Collections
\r
14 import java.util.concurrent.Callable
\r
15 import java.util.concurrent.ExecutorService
\r
16 import org.opendaylight.controller.sal.binding.api.NotificationListener
\r
17 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
\r
18 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker
\r
19 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
\r
20 import org.opendaylight.yangtools.concepts.ListenerRegistration
\r
21 import org.opendaylight.yangtools.concepts.Registration
\r
22 import org.opendaylight.yangtools.yang.binding.Notification
\r
23 import org.slf4j.LoggerFactory
\r
24 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder
\rimport com.google.common.collect.Multimaps
\r
25 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
\r
26 import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener
\rimport java.util.Set
\r
27 import java.util.Set
\r
28 import com.google.common.collect.ImmutableSet
\r
29 import java.util.concurrent.Future
\r
31 class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
\r
33 val ListenerRegistry<NotificationInterestListener> interestListeners = ListenerRegistry.create;
\r
35 val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;
\r
38 var ExecutorService executor;
\r
40 val logger = LoggerFactory.getLogger(NotificationBrokerImpl)
\r
43 listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())
\r
47 new(ExecutorService executor) {
\r
48 listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())
\r
49 this.executor = executor;
\r
53 override <T extends Notification> addNotificationListener(Class<T> notificationType,
\r
54 NotificationListener<T> listener) {
\r
55 listeners.put(notificationType, listener)
\r
59 override <T extends Notification> removeNotificationListener(Class<T> notificationType,
\r
60 NotificationListener<T> listener) {
\r
61 listeners.remove(notificationType, listener)
\r
64 override notify(Notification notification) {
\r
65 publish(notification)
\r
68 def getNotificationTypes(Notification notification) {
\r
69 notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]
\r
72 @SuppressWarnings("unchecked")
\r
73 private def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {
\r
74 listeners.forEach[(it as NotificationListener).onNotification(notification)]
\r
78 override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
\r
79 throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");
\r
84 override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
\r
85 throw new UnsupportedOperationException(
\r
86 "Deprecated method. Use RegisterNotificationListener returned value to close registration.")
\r
90 override notify(Notification notification, ExecutorService service) {
\r
91 publish(notification, service)
\r
94 override publish(Notification notification) {
\r
95 publish(notification, executor)
\r
98 override publish(Notification notification, ExecutorService service) {
\r
99 val allTypes = notification.notificationTypes
\r
101 var Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
\r
102 for (type : allTypes) {
\r
103 listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)
\r
105 val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;
\r
106 submitAll(executor,tasks);
\r
109 def submitAll(ExecutorService service, Set<NotifyTask> tasks) {
\r
110 val ret = ImmutableSet.<Future<Object>>builder();
\r
111 for(task : tasks) {
\r
112 ret.add(service.submit(task));
\r
114 return ret.build();
\r
117 override <T extends Notification> registerNotificationListener(Class<T> notificationType,
\r
118 NotificationListener<T> listener) {
\r
119 val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
\r
120 listeners.put(notificationType, listener);
\r
121 announceNotificationSubscription(notificationType);
\r
125 def announceNotificationSubscription(Class<? extends Notification> notification) {
\r
126 for (listener : interestListeners) {
\r
128 listener.instance.onNotificationSubscribtion(notification);
\r
129 } catch (Exception e) {
\r
130 logger.error("", e.message)
\r
135 override registerNotificationListener(
\r
136 org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
\r
137 val invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
\r
138 for (notifyType : invoker.supportedNotifications) {
\r
139 listeners.put(notifyType, invoker.invocationProxy)
\r
140 announceNotificationSubscription(notifyType)
\r
142 val registration = new GeneratedListenerRegistration(listener, invoker,this);
\r
143 return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;
\r
146 protected def unregisterListener(GenericNotificationRegistration<?> reg) {
\r
147 listeners.remove(reg.type, reg.instance);
\r
150 protected def unregisterListener(GeneratedListenerRegistration reg) {
\r
151 for (notifyType : reg.invoker.supportedNotifications) {
\r
152 listeners.remove(notifyType, reg.invoker.invocationProxy)
\r
157 //FIXME: implement properly.
\r
160 override registerInterestListener(NotificationInterestListener interestListener) {
\r
161 val registration = interestListeners.register(interestListener);
\r
163 for(notification : listeners.keySet) {
\r
164 interestListener.onNotificationSubscribtion(notification);
\r
166 return registration
\r
170 class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
\r
175 var NotificationBrokerImpl notificationBroker;
\r
177 public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {
\r
180 notificationBroker = broker;
\r
183 override protected removeRegistration() {
\r
184 notificationBroker.unregisterListener(this);
\r
185 notificationBroker = null;
\r
189 class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {
\r
192 val NotificationInvoker invoker;
\r
194 var NotificationBrokerImpl notificationBroker;
\r
197 new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {
\r
199 _invoker = invoker;
\r
200 notificationBroker = broker;
\r
203 override protected removeRegistration() {
\r
204 notificationBroker.unregisterListener(this);
\r
205 notificationBroker = null;
\r
211 class NotifyTask implements Callable<Object> {
\r
213 private static val log = LoggerFactory.getLogger(NotifyTask);
\r
215 @SuppressWarnings("rawtypes")
\r
216 val NotificationListener listener;
\r
217 val Notification notification;
\r
220 //Only logging the complete notification in debug mode
\r
222 if(log.isDebugEnabled){
\r
223 log.debug("Delivering notification {} to {}",notification,listener);
\r
225 log.trace("Delivering notification {} to {}",notification.class.name,listener);
\r
227 listener.onNotification(notification);
\r
228 if(log.isDebugEnabled){
\r
229 log.debug("Notification delivered {} to {}",notification,listener);
\r
231 log.trace("Notification delivered {} to {}",notification.class.name,listener);
\r
233 } catch (Exception e) {
\r
234 log.error("Unhandled exception thrown by listener: {}", listener, e);
\r