Merge "HostTracker hosts DB key scheme implementation"
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / NotificationBrokerImpl.xtend
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl
9
10 import org.opendaylight.controller.sal.binding.api.NotificationProviderService
11 import org.opendaylight.yangtools.yang.binding.Notification
12 import com.google.common.collect.Multimap
13 import org.opendaylight.controller.sal.binding.api.NotificationListener
14 import com.google.common.collect.HashMultimap
15 import java.util.concurrent.ExecutorService
16 import java.util.Collection
17 import org.opendaylight.yangtools.concepts.Registration
18 import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator
19 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory
20 import org.opendaylight.yangtools.concepts.ListenerRegistration
21 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker
22 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
23 import java.util.Collections
24 import org.slf4j.LoggerFactory
25 import java.util.concurrent.Callable
26
27 class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
28
29     val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;
30
31     @Property
32     var ExecutorService executor;
33
34     new(ExecutorService executor) {
35         listeners = HashMultimap.create()
36         this.executor = executor;
37     }
38
39     @Deprecated
40     override <T extends Notification> addNotificationListener(Class<T> notificationType,
41         NotificationListener<T> listener) {
42         listeners.put(notificationType, listener)
43     }
44
45     @Deprecated
46     override <T extends Notification> removeNotificationListener(Class<T> notificationType,
47         NotificationListener<T> listener) {
48         listeners.remove(notificationType, listener)
49     }
50
51     override notify(Notification notification) {
52         publish(notification)
53     }
54
55     def getNotificationTypes(Notification notification) {
56         notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]
57     }
58
59     @SuppressWarnings("unchecked")
60     private def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {
61         listeners.forEach[(it as NotificationListener).onNotification(notification)]
62     }
63
64     @Deprecated
65     override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
66         throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");
67
68     }
69
70     @Deprecated
71     override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
72         throw new UnsupportedOperationException(
73             "Deprecated method. Use RegisterNotificationListener returned value to close registration.")
74     }
75
76     @Deprecated
77     override notify(Notification notification, ExecutorService service) {
78         publish(notification, service)
79     }
80
81     override publish(Notification notification) {
82         publish(notification, executor)
83     }
84
85     override publish(Notification notification, ExecutorService service) {
86         val allTypes = notification.notificationTypes
87
88         var Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
89         for (type : allTypes) {
90             listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)
91         }
92         val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;
93         executor.invokeAll(tasks);
94     }
95
96     override <T extends Notification> registerNotificationListener(Class<T> notificationType,
97         NotificationListener<T> listener) {
98         val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
99         listeners.put(notificationType, listener);
100         return reg;
101     }
102
103     override registerNotificationListener(
104         org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
105         val invoker = BindingAwareBrokerImpl.generator.invokerFactory.invokerFor(listener);
106         for (notifyType : invoker.supportedNotifications) {
107             listeners.put(notifyType, invoker.invocationProxy)
108         }
109         val registration = new GeneratedListenerRegistration(listener, invoker,this);
110         return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;
111     }
112
113     protected def unregisterListener(GenericNotificationRegistration<?> reg) {
114         listeners.remove(reg.type, reg.instance);
115     }
116
117     protected def unregisterListener(GeneratedListenerRegistration reg) {
118         for (notifyType : reg.invoker.supportedNotifications) {
119             listeners.remove(notifyType, reg.invoker.invocationProxy)
120         }
121     }
122     
123     override close()  {
124         //FIXME: implement properly.
125     }
126     
127 }
128
129 class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
130
131     @Property
132     val Class<T> type;
133
134     var NotificationBrokerImpl notificationBroker;
135
136     public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {
137         super(instance);
138         _type = type;
139         notificationBroker = broker;
140     }
141
142     override protected removeRegistration() {
143         notificationBroker.unregisterListener(this);
144         notificationBroker = null;
145     }
146 }
147
148 class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {
149
150     @Property
151     val NotificationInvoker invoker;
152     
153     var NotificationBrokerImpl notificationBroker;
154     
155
156     new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {
157         super(instance);
158         _invoker = invoker;
159         notificationBroker = broker;
160     }
161
162     override protected removeRegistration() {
163         notificationBroker.unregisterListener(this);
164         notificationBroker = null;
165         invoker.close();
166     }
167 }
168
169 @Data
170 class NotifyTask implements Callable<Object> {
171
172     private static val log = LoggerFactory.getLogger(NotifyTask);
173
174     val NotificationListener listener;
175     val Notification notification;
176
177     override call() {
178         try {
179             log.info("Delivering notification {} to {}",notification,listener);
180             listener.onNotification(notification);
181             log.info("Notification delivered {} to {}",notification,listener);
182         } catch (Exception e) {
183             log.error("Unhandled exception thrown by listener: {}", listener, e);
184         }
185         return null;
186     }
187
188 }