Merge changes Ibc19654a,I17a345ea,Id0377e20
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / NotificationBrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl
9
10 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
11 import org.opendaylight.yangtools.yang.binding.Notification
12 import com.google.common.collect.Multimap
13 import org.opendaylight.controller.sal.binding.api.NotificationListener
14 import com.google.common.collect.HashMultimap
15 import java.util.concurrent.ExecutorService
16 import java.util.Collection
17 import org.opendaylight.yangtools.concepts.Registration
18 import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator
19 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory
20 import org.opendaylight.yangtools.concepts.ListenerRegistration
21 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker
22 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
23 import java.util.Collections
24 import org.slf4j.LoggerFactory
25 import java.util.concurrent.Callable
26
27 class NotificationBrokerImpl implements NotificationProviderService {
28
29     val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;
30
31     @Property
32     var ExecutorService executor;
33
34     @Property
35     var RuntimeCodeGenerator generator;
36
37     @Property
38     var NotificationInvokerFactory invokerFactory;
39
40     new(ExecutorService executor) {
41         listeners = HashMultimap.create()
42         this.executor = executor;
43     }
44
45     @Deprecated
46     override <T extends Notification> addNotificationListener(Class<T> notificationType,
47         NotificationListener<T> listener) {
48         listeners.put(notificationType, listener)
49     }
50
51     @Deprecated
52     override <T extends Notification> removeNotificationListener(Class<T> notificationType,
53         NotificationListener<T> listener) {
54         listeners.remove(notificationType, listener)
55     }
56
57     override notify(Notification notification) {
58         publish(notification)
59     }
60
61     def getNotificationTypes(Notification notification) {
62         notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]
63     }
64
65     @SuppressWarnings("unchecked")
66     private def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {
67         listeners.forEach[(it as NotificationListener).onNotification(notification)]
68     }
69
70     @Deprecated
71     override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
72         throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");
73
74     }
75
76     @Deprecated
77     override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
78         throw new UnsupportedOperationException(
79             "Deprecated method. Use RegisterNotificationListener returned value to close registration.")
80     }
81
82     @Deprecated
83     override notify(Notification notification, ExecutorService service) {
84         publish(notification, service)
85     }
86
87     override publish(Notification notification) {
88         publish(notification, executor)
89     }
90
91     override publish(Notification notification, ExecutorService service) {
92         val allTypes = notification.notificationTypes
93
94         var Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
95         for (type : allTypes) {
96             listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)
97         }
98         val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;
99         executor.invokeAll(tasks);
100     }
101
102     override <T extends Notification> registerNotificationListener(Class<T> notificationType,
103         NotificationListener<T> listener) {
104         val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
105         listeners.put(notificationType, listener);
106         return reg;
107     }
108
109     override registerNotificationListener(
110         org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
111         val invoker = invokerFactory.invokerFor(listener);
112         for (notifyType : invoker.supportedNotifications) {
113             listeners.put(notifyType, invoker.invocationProxy)
114         }
115         val registration = new GeneratedListenerRegistration(listener, invoker,this);
116         return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;
117     }
118
119     protected def unregisterListener(GenericNotificationRegistration<?> reg) {
120         listeners.remove(reg.type, reg.instance);
121     }
122
123     protected def unregisterListener(GeneratedListenerRegistration reg) {
124         for (notifyType : reg.invoker.supportedNotifications) {
125             listeners.remove(notifyType, reg.invoker.invocationProxy)
126         }
127     }
128 }
129
130 class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
131
132     @Property
133     val Class<T> type;
134
135     var NotificationBrokerImpl notificationBroker;
136
137     public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {
138         super(instance);
139         _type = type;
140         notificationBroker = broker;
141     }
142
143     override protected removeRegistration() {
144         notificationBroker.unregisterListener(this);
145         notificationBroker = null;
146     }
147 }
148
149 class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {
150
151     @Property
152     val NotificationInvoker invoker;
153     
154     var NotificationBrokerImpl notificationBroker;
155     
156
157     new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {
158         super(instance);
159         _invoker = invoker;
160         notificationBroker = broker;
161     }
162
163     override protected removeRegistration() {
164         notificationBroker.unregisterListener(this);
165         notificationBroker = null;
166         invoker.close();
167     }
168 }
169
170 @Data
171 class NotifyTask implements Callable<Object> {
172
173     private static val log = LoggerFactory.getLogger(NotifyTask);
174
175     val NotificationListener listener;
176     val Notification notification;
177
178     override call() {
179         try {
180             listener.onNotification(notification);
181         } catch (Exception e) {
182             log.error("Unhandled exception thrown by listener: {}", listener, e);
183         }
184         return null;
185     }
186
187 }