Merge "Bug fixes for netconf southbound plugin."
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / NotificationBrokerImpl.xtend
1 /*\r
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.\r
3  *\r
4  * This program and the accompanying materials are made available under the\r
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
6  * and is available at http://www.eclipse.org/legal/epl-v10.html\r
7  */\r
8 package org.opendaylight.controller.sal.binding.impl\r
9 \r
10 import com.google.common.collect.HashMultimap\r
11 import com.google.common.collect.Multimap\r
12 import java.util.Collection\r
13 import java.util.Collections\r
14 import java.util.concurrent.Callable\r
15 import java.util.concurrent.ExecutorService\r
16 import org.opendaylight.controller.sal.binding.api.NotificationListener\r
17 import org.opendaylight.controller.sal.binding.api.NotificationProviderService\r
18 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker\r
19 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
20 import org.opendaylight.yangtools.concepts.ListenerRegistration\r
21 import org.opendaylight.yangtools.concepts.Registration\r
22 import org.opendaylight.yangtools.yang.binding.Notification\r
23 import org.slf4j.LoggerFactory\r
24 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder\rimport com.google.common.collect.Multimaps\r
25 import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
26 import org.opendaylight.controller.sal.binding.api.NotificationProviderService.NotificationInterestListener\rimport java.util.Set\r
27 import java.util.Set\r
28 import com.google.common.collect.ImmutableSet\r
29 import java.util.concurrent.Future\r
30 \r
31 class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {\r
32     \r
33     val ListenerRegistry<NotificationInterestListener> interestListeners = ListenerRegistry.create;\r
34     \r
35     val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;\r
36 \r
37     @Property\r
38     var ExecutorService executor;\r
39     \r
40     val logger = LoggerFactory.getLogger(NotificationBrokerImpl)\r
41 \r
42     new() {\r
43         listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())\r
44     }\r
45 \r
46     @Deprecated\r
47     new(ExecutorService executor) {\r
48         listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create())\r
49         this.executor = executor;\r
50     }\r
51 \r
52     @Deprecated\r
53     override <T extends Notification> addNotificationListener(Class<T> notificationType,\r
54         NotificationListener<T> listener) {\r
55         listeners.put(notificationType, listener)\r
56     }\r
57 \r
58     @Deprecated\r
59     override <T extends Notification> removeNotificationListener(Class<T> notificationType,\r
60         NotificationListener<T> listener) {\r
61         listeners.remove(notificationType, listener)\r
62     }\r
63 \r
64     override notify(Notification notification) {\r
65         publish(notification)\r
66     }\r
67 \r
68     def getNotificationTypes(Notification notification) {\r
69         notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]\r
70     }\r
71 \r
72     @SuppressWarnings("unchecked")\r
73     private def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {\r
74         listeners.forEach[(it as NotificationListener).onNotification(notification)]\r
75     }\r
76 \r
77     @Deprecated\r
78     override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
79         throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");\r
80 \r
81     }\r
82 \r
83     @Deprecated\r
84     override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
85         throw new UnsupportedOperationException(\r
86             "Deprecated method. Use RegisterNotificationListener returned value to close registration.")\r
87     }\r
88 \r
89     @Deprecated\r
90     override notify(Notification notification, ExecutorService service) {\r
91         publish(notification, service)\r
92     }\r
93 \r
94     override publish(Notification notification) {\r
95         publish(notification, executor)\r
96     }\r
97 \r
98     override publish(Notification notification, ExecutorService service) {\r
99         val allTypes = notification.notificationTypes\r
100 \r
101         var Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();\r
102         for (type : allTypes) {\r
103             listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)\r
104         }\r
105         val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;\r
106         submitAll(executor,tasks);\r
107     }\r
108     \r
109     def submitAll(ExecutorService service, Set<NotifyTask> tasks) {\r
110         val ret = ImmutableSet.<Future<Object>>builder();\r
111         for(task : tasks) {\r
112             ret.add(service.submit(task));\r
113         }\r
114         return ret.build();\r
115     }\r
116     \r
117     override <T extends Notification> registerNotificationListener(Class<T> notificationType,\r
118         NotificationListener<T> listener) {\r
119         val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);\r
120         listeners.put(notificationType, listener);\r
121         announceNotificationSubscription(notificationType);\r
122         return reg;\r
123     }\r
124     \r
125     def announceNotificationSubscription(Class<? extends Notification> notification) {\r
126         for (listener : interestListeners) {\r
127             try {\r
128                 listener.instance.onNotificationSubscribtion(notification);\r
129             } catch (Exception e) {\r
130                 logger.error("", e.message)\r
131             }\r
132         }\r
133     }\r
134 \r
135     override registerNotificationListener(\r
136         org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
137         val invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);\r
138         for (notifyType : invoker.supportedNotifications) {\r
139             listeners.put(notifyType, invoker.invocationProxy)\r
140             announceNotificationSubscription(notifyType)\r
141         }\r
142         val registration = new GeneratedListenerRegistration(listener, invoker,this);\r
143         return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;\r
144     }\r
145 \r
146     protected def unregisterListener(GenericNotificationRegistration<?> reg) {\r
147         listeners.remove(reg.type, reg.instance);\r
148     }\r
149 \r
150     protected def unregisterListener(GeneratedListenerRegistration reg) {\r
151         for (notifyType : reg.invoker.supportedNotifications) {\r
152             listeners.remove(notifyType, reg.invoker.invocationProxy)\r
153         }\r
154     }\r
155     \r
156     override close()  {\r
157         //FIXME: implement properly.\r
158     }\r
159     \r
160     override registerInterestListener(NotificationInterestListener interestListener) {\r
161         val registration = interestListeners.register(interestListener);\r
162         \r
163         for(notification : listeners.keySet) {\r
164             interestListener.onNotificationSubscribtion(notification);\r
165         }\r
166         return registration\r
167     }\r
168 }\r
169 \r
170 class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {\r
171 \r
172     @Property\r
173     val Class<T> type;\r
174 \r
175     var NotificationBrokerImpl notificationBroker;\r
176 \r
177     public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {\r
178         super(instance);\r
179         _type = type;\r
180         notificationBroker = broker;\r
181     }\r
182 \r
183     override protected removeRegistration() {\r
184         notificationBroker.unregisterListener(this);\r
185         notificationBroker = null;\r
186     }\r
187 }\r
188 \r
189 class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {\r
190 \r
191     @Property\r
192     val NotificationInvoker invoker;\r
193     \r
194     var NotificationBrokerImpl notificationBroker;\r
195     \r
196 \r
197     new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {\r
198         super(instance);\r
199         _invoker = invoker;\r
200         notificationBroker = broker;\r
201     }\r
202 \r
203     override protected removeRegistration() {\r
204         notificationBroker.unregisterListener(this);\r
205         notificationBroker = null;\r
206         invoker.close();\r
207     }\r
208 }\r
209 \r
210 @Data\r
211 class NotifyTask implements Callable<Object> {\r
212 \r
213     private static val log = LoggerFactory.getLogger(NotifyTask);\r
214 \r
215     @SuppressWarnings("rawtypes")\r
216     val NotificationListener listener;\r
217     val Notification notification;\r
218 \r
219     override call() {\r
220         //Only logging the complete notification in debug mode\r
221         try {\r
222             if(log.isDebugEnabled){\r
223                 log.debug("Delivering notification {} to {}",notification,listener);\r
224             } else {\r
225                 log.trace("Delivering notification {} to {}",notification.class.name,listener);\r
226             }\r
227             listener.onNotification(notification);\r
228             if(log.isDebugEnabled){\r
229                 log.debug("Notification delivered {} to {}",notification,listener);\r
230             } else {\r
231                 log.trace("Notification delivered {} to {}",notification.class.name,listener);\r
232             }\r
233         } catch (Exception e) {\r
234             log.error("Unhandled exception thrown by listener: {}", listener, e);\r
235         }\r
236         return null;\r
237     }\r
238 \r
239 }\r