5c7d924d340c07d2c6ecf76030d086c128835391
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / NotificationBrokerImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl;
9
10 import java.util.Collections;
11 import java.util.Set;
12 import java.util.concurrent.ExecutorService;
13 import java.util.concurrent.Future;
14
15 import org.eclipse.xtext.xbase.lib.Conversions;
16 import org.eclipse.xtext.xbase.lib.Functions.Function1;
17 import org.eclipse.xtext.xbase.lib.IterableExtensions;
18 import org.opendaylight.controller.sal.binding.api.NotificationListener;
19 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
20 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
21 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
22 import org.opendaylight.yangtools.concepts.ListenerRegistration;
23 import org.opendaylight.yangtools.concepts.Registration;
24 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
25 import org.opendaylight.yangtools.yang.binding.Notification;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import com.google.common.base.Preconditions;
30 import com.google.common.collect.HashMultimap;
31 import com.google.common.collect.ImmutableSet;
32 import com.google.common.collect.ImmutableSet.Builder;
33 import com.google.common.collect.Iterables;
34 import com.google.common.collect.Multimap;
35 import com.google.common.collect.Multimaps;
36
37 public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
38     private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
39
40     private final ListenerRegistry<NotificationInterestListener> interestListeners =
41             ListenerRegistry.create();
42
43     private final Multimap<Class<? extends Notification>, NotificationListener<?>> listeners =
44             Multimaps.synchronizedSetMultimap(HashMultimap.<Class<? extends Notification>, NotificationListener<?>>create());
45     private ExecutorService executor;
46
47     @Deprecated
48     public NotificationBrokerImpl(final ExecutorService executor) {
49         this.setExecutor(executor);
50     }
51
52     public void setExecutor(final ExecutorService executor) {
53         this.executor = Preconditions.checkNotNull(executor);
54     }
55
56     public Iterable<Class<?>> getNotificationTypes(final Notification notification) {
57         Class<?>[] _interfaces = notification.getClass().getInterfaces();
58         final Function1<Class<?>, Boolean> _function = new Function1<Class<?>, Boolean>() {
59             @Override
60             public Boolean apply(final Class<?> it) {
61                 if (Notification.class.equals(it)) {
62                     return false;
63                 }
64                 return Notification.class.isAssignableFrom(it);
65             }
66         };
67         return IterableExtensions.filter(((Iterable<Class<?>>)Conversions.doWrapArray(_interfaces)), _function);
68     }
69
70     @Override
71     public void publish(final Notification notification) {
72         this.publish(notification, executor);
73     }
74
75     @Override
76     public void publish(final Notification notification, final ExecutorService service) {
77         Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
78         for (final Class<?> type : getNotificationTypes(notification)) {
79             listenerToNotify = Iterables.concat(listenerToNotify, listeners.get(((Class<? extends Notification>) type)));
80         }
81         final Function1<NotificationListener<?>,NotifyTask> _function = new Function1<NotificationListener<?>, NotifyTask>() {
82             @Override
83             public NotifyTask apply(final NotificationListener<?> it) {
84                 return new NotifyTask(it, notification);
85             }
86         };
87         final Set<NotifyTask> tasks = IterableExtensions.<NotifyTask>toSet(
88                 IterableExtensions.<NotificationListener<?>, NotifyTask>map(listenerToNotify, _function));
89         this.submitAll(executor, tasks);
90     }
91
92     private ImmutableSet<Future<Object>> submitAll(final ExecutorService service, final Set<NotifyTask> tasks) {
93         final Builder<Future<Object>> ret = ImmutableSet.<Future<Object>>builder();
94         for (final NotifyTask task : tasks) {
95             ret.add(service.submit(task));
96         }
97         return ret.build();
98     }
99
100     @Override
101     public <T extends Notification> Registration<NotificationListener<T>> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
102         final GenericNotificationRegistration<T> reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
103         this.listeners.put(notificationType, listener);
104         this.announceNotificationSubscription(notificationType);
105         return reg;
106     }
107
108     private void announceNotificationSubscription(final Class<? extends Notification> notification) {
109         for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
110             try {
111                 listener.getInstance().onNotificationSubscribtion(notification);
112             } catch (Exception e) {
113                 LOG.warn("Listener {} reported unexpected error on notification {}",
114                         listener.getInstance(), notification, e);
115             }
116         }
117     }
118
119     @Override
120     public Registration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
121         final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
122         for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
123             listeners.put(notifyType, invoker.getInvocationProxy());
124             announceNotificationSubscription(notifyType);
125         }
126
127         return new GeneratedListenerRegistration(listener, invoker, this);
128     }
129
130     protected boolean unregisterListener(final GenericNotificationRegistration<?> reg) {
131         return listeners.remove(reg.getType(), reg.getInstance());
132     }
133
134     protected void unregisterListener(final GeneratedListenerRegistration reg) {
135         final NotificationInvoker invoker = reg.getInvoker();
136         for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
137             this.listeners.remove(notifyType, invoker.getInvocationProxy());
138         }
139     }
140
141     @Override
142     public void close() {
143     }
144
145     @Override
146     public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
147         final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
148         for (final Class<? extends Notification> notification : listeners.keySet()) {
149             interestListener.onNotificationSubscribtion(notification);
150         }
151         return registration;
152     }
153 }