2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.binding.impl
10 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
11 import org.opendaylight.yangtools.yang.binding.Notification
12 import com.google.common.collect.Multimap
13 import org.opendaylight.controller.sal.binding.api.NotificationListener
14 import com.google.common.collect.HashMultimap
15 import java.util.concurrent.ExecutorService
16 import java.util.Collection
17 import org.opendaylight.yangtools.concepts.Registration
18 import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator
19 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory
20 import org.opendaylight.yangtools.concepts.ListenerRegistration
21 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker
22 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
23 import java.util.Collections
24 import org.slf4j.LoggerFactory
25 import java.util.concurrent.Callable
27 class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
29 val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;
32 var ExecutorService executor;
34 new(ExecutorService executor) {
35 listeners = HashMultimap.create()
36 this.executor = executor;
40 override <T extends Notification> addNotificationListener(Class<T> notificationType,
41 NotificationListener<T> listener) {
42 listeners.put(notificationType, listener)
46 override <T extends Notification> removeNotificationListener(Class<T> notificationType,
47 NotificationListener<T> listener) {
48 listeners.remove(notificationType, listener)
51 override notify(Notification notification) {
55 def getNotificationTypes(Notification notification) {
56 notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]
59 @SuppressWarnings("unchecked")
60 private def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {
61 listeners.forEach[(it as NotificationListener).onNotification(notification)]
65 override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
66 throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");
71 override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
72 throw new UnsupportedOperationException(
73 "Deprecated method. Use RegisterNotificationListener returned value to close registration.")
77 override notify(Notification notification, ExecutorService service) {
78 publish(notification, service)
81 override publish(Notification notification) {
82 publish(notification, executor)
85 override publish(Notification notification, ExecutorService service) {
86 val allTypes = notification.notificationTypes
88 var Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
89 for (type : allTypes) {
90 listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)
92 val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;
93 executor.invokeAll(tasks);
96 override <T extends Notification> registerNotificationListener(Class<T> notificationType,
97 NotificationListener<T> listener) {
98 val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
99 listeners.put(notificationType, listener);
103 override registerNotificationListener(
104 org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
105 val invoker = BindingAwareBrokerImpl.generator.invokerFactory.invokerFor(listener);
106 for (notifyType : invoker.supportedNotifications) {
107 listeners.put(notifyType, invoker.invocationProxy)
109 val registration = new GeneratedListenerRegistration(listener, invoker,this);
110 return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;
113 protected def unregisterListener(GenericNotificationRegistration<?> reg) {
114 listeners.remove(reg.type, reg.instance);
117 protected def unregisterListener(GeneratedListenerRegistration reg) {
118 for (notifyType : reg.invoker.supportedNotifications) {
119 listeners.remove(notifyType, reg.invoker.invocationProxy)
124 //FIXME: implement properly.
129 class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
134 var NotificationBrokerImpl notificationBroker;
136 public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {
139 notificationBroker = broker;
142 override protected removeRegistration() {
143 notificationBroker.unregisterListener(this);
144 notificationBroker = null;
148 class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {
151 val NotificationInvoker invoker;
153 var NotificationBrokerImpl notificationBroker;
156 new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {
159 notificationBroker = broker;
162 override protected removeRegistration() {
163 notificationBroker.unregisterListener(this);
164 notificationBroker = null;
170 class NotifyTask implements Callable<Object> {
172 private static val log = LoggerFactory.getLogger(NotifyTask);
174 val NotificationListener listener;
175 val Notification notification;
179 log.info("Delivering notification {} to {}",notification,listener);
180 listener.onNotification(notification);
181 log.info("Notification delivered {} to {}",notification,listener);
182 } catch (Exception e) {
183 log.error("Unhandled exception thrown by listener: {}", listener, e);