2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
\r
4 * This program and the accompanying materials are made available under the
\r
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
\r
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
\r
8 package org.opendaylight.controller.sal.binding.impl
\r
10 import com.google.common.collect.HashMultimap
\r
11 import com.google.common.collect.ImmutableSet
\r
12 import com.google.common.collect.Multimap
\r
13 import com.google.common.collect.Multimaps
\r
14 import java.util.Collections
\r
15 import java.util.concurrent.Callable
\r
16 import java.util.concurrent.ExecutorService
\r
17 import java.util.concurrent.Future
\r
18 import java.util.Set
\r
19 import org.opendaylight.controller.sal.binding.api.NotificationListener
\r
20 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
\r
21 import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener
\r
22 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder
\r
23 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker
\r
24 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
\r
25 import org.opendaylight.yangtools.concepts.ListenerRegistration
\r
26 import org.opendaylight.yangtools.concepts.Registration
\r
27 import org.opendaylight.yangtools.concepts.util.ListenerRegistry
\r
28 import org.opendaylight.yangtools.yang.binding.Notification
\r
29 import org.slf4j.LoggerFactory
\r
31 class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
\r
33 val ListenerRegistry<NotificationInterestListener> interestListeners = ListenerRegistry.create;
\r
35 val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;
\r
38 var ExecutorService executor;
\r
40 val logger = LoggerFactory.getLogger(NotificationBrokerImpl)
\r
43 listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())
\r
47 new(ExecutorService executor) {
\r
48 listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())
\r
49 this.executor = executor;
\r
52 def getNotificationTypes(Notification notification) {
\r
53 notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]
\r
56 override publish(Notification notification) {
\r
57 publish(notification, executor)
\r
60 override publish(Notification notification, ExecutorService service) {
\r
61 val allTypes = notification.notificationTypes
\r
63 var Iterable<NotificationListener<? extends Object>> listenerToNotify = Collections.emptySet();
\r
64 for (type : allTypes) {
\r
65 listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)
\r
67 val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;
\r
68 submitAll(executor,tasks);
\r
71 def submitAll(ExecutorService service, Set<NotifyTask> tasks) {
\r
72 val ret = ImmutableSet.<Future<Object>>builder();
\r
74 ret.add(service.submit(task));
\r
79 override <T extends Notification> registerNotificationListener(Class<T> notificationType,
\r
80 NotificationListener<T> listener) {
\r
81 val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
\r
82 listeners.put(notificationType, listener);
\r
83 announceNotificationSubscription(notificationType);
\r
87 def announceNotificationSubscription(Class<? extends Notification> notification) {
\r
88 for (listener : interestListeners) {
\r
90 listener.instance.onNotificationSubscribtion(notification);
\r
91 } catch (Exception e) {
\r
92 logger.error("", e.message)
\r
97 override registerNotificationListener(
\r
98 org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
\r
99 val invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
\r
100 for (notifyType : invoker.supportedNotifications) {
\r
101 listeners.put(notifyType, invoker.invocationProxy)
\r
102 announceNotificationSubscription(notifyType)
\r
104 val registration = new GeneratedListenerRegistration(listener, invoker,this);
\r
105 return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;
\r
108 protected def unregisterListener(GenericNotificationRegistration<?> reg) {
\r
109 listeners.remove(reg.type, reg.instance);
\r
112 protected def unregisterListener(GeneratedListenerRegistration reg) {
\r
113 for (notifyType : reg.invoker.supportedNotifications) {
\r
114 listeners.remove(notifyType, reg.invoker.invocationProxy)
\r
119 //FIXME: implement properly.
\r
122 override registerInterestListener(NotificationInterestListener interestListener) {
\r
123 val registration = interestListeners.register(interestListener);
\r
125 for(notification : listeners.keySet) {
\r
126 interestListener.onNotificationSubscribtion(notification);
\r
128 return registration
\r
132 class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
\r
137 var NotificationBrokerImpl notificationBroker;
\r
139 public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {
\r
142 notificationBroker = broker;
\r
145 override protected removeRegistration() {
\r
146 notificationBroker.unregisterListener(this);
\r
147 notificationBroker = null;
\r
151 class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {
\r
154 val NotificationInvoker invoker;
\r
156 var NotificationBrokerImpl notificationBroker;
\r
159 new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {
\r
161 _invoker = invoker;
\r
162 notificationBroker = broker;
\r
165 override protected removeRegistration() {
\r
166 notificationBroker.unregisterListener(this);
\r
167 notificationBroker = null;
\r
173 class NotifyTask implements Callable<Object> {
\r
175 private static val log = LoggerFactory.getLogger(NotifyTask);
\r
177 @SuppressWarnings("rawtypes")
\r
178 val NotificationListener listener;
\r
179 val Notification notification;
\r
182 //Only logging the complete notification in debug mode
\r
184 if(log.isDebugEnabled){
\r
185 log.debug("Delivering notification {} to {}",notification,listener);
\r
187 log.trace("Delivering notification {} to {}",notification.class.name,listener);
\r
189 listener.onNotification(notification);
\r
190 if(log.isDebugEnabled){
\r
191 log.debug("Notification delivered {} to {}",notification,listener);
\r
193 log.trace("Notification delivered {} to {}",notification.class.name,listener);
\r
195 } catch (Exception e) {
\r
196 log.error("Unhandled exception thrown by listener: {}", listener, e);
\r