49d51025dd174d6980a6a7834a4c75284c1810e8
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / NotificationBrokerImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl;
9
10 import java.util.Collections;
11 import java.util.Set;
12 import java.util.concurrent.ExecutorService;
13
14 import org.eclipse.xtext.xbase.lib.Conversions;
15 import org.eclipse.xtext.xbase.lib.Functions.Function1;
16 import org.eclipse.xtext.xbase.lib.IterableExtensions;
17 import org.opendaylight.controller.sal.binding.api.NotificationListener;
18 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
19 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
20 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.opendaylight.yangtools.concepts.Registration;
23 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
24 import org.opendaylight.yangtools.yang.binding.Notification;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import com.google.common.base.Preconditions;
29 import com.google.common.collect.HashMultimap;
30 import com.google.common.collect.Iterables;
31 import com.google.common.collect.Multimap;
32 import com.google.common.collect.Multimaps;
33
34 public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
35     private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
36
37     private final ListenerRegistry<NotificationInterestListener> interestListeners =
38             ListenerRegistry.create();
39
40     private final Multimap<Class<? extends Notification>, NotificationListener<?>> listeners =
41             Multimaps.synchronizedSetMultimap(HashMultimap.<Class<? extends Notification>, NotificationListener<?>>create());
42     private ExecutorService executor;
43
44     @Deprecated
45     public NotificationBrokerImpl(final ExecutorService executor) {
46         this.setExecutor(executor);
47     }
48
49     public void setExecutor(final ExecutorService executor) {
50         this.executor = Preconditions.checkNotNull(executor);
51     }
52
53     public Iterable<Class<?>> getNotificationTypes(final Notification notification) {
54         Class<?>[] _interfaces = notification.getClass().getInterfaces();
55         final Function1<Class<?>, Boolean> _function = new Function1<Class<?>, Boolean>() {
56             @Override
57             public Boolean apply(final Class<?> it) {
58                 if (Notification.class.equals(it)) {
59                     return false;
60                 }
61                 return Notification.class.isAssignableFrom(it);
62             }
63         };
64         return IterableExtensions.filter(((Iterable<Class<?>>)Conversions.doWrapArray(_interfaces)), _function);
65     }
66
67     @Override
68     public void publish(final Notification notification) {
69         this.publish(notification, executor);
70     }
71
72     @Override
73     public void publish(final Notification notification, final ExecutorService service) {
74         Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
75         for (final Class<?> type : getNotificationTypes(notification)) {
76             listenerToNotify = Iterables.concat(listenerToNotify, listeners.get(((Class<? extends Notification>) type)));
77         }
78         final Function1<NotificationListener<?>,NotifyTask> _function = new Function1<NotificationListener<?>, NotifyTask>() {
79             @Override
80             public NotifyTask apply(final NotificationListener<?> it) {
81                 return new NotifyTask(it, notification);
82             }
83         };
84         final Set<NotifyTask> tasks = IterableExtensions.<NotifyTask>toSet(
85                 IterableExtensions.<NotificationListener<?>, NotifyTask>map(listenerToNotify, _function));
86
87         for (final NotifyTask task : tasks) {
88             service.submit(task);
89         }
90     }
91
92     @Override
93     public <T extends Notification> Registration<NotificationListener<T>> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
94         final GenericNotificationRegistration<T> reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
95         this.listeners.put(notificationType, listener);
96         this.announceNotificationSubscription(notificationType);
97         return reg;
98     }
99
100     private void announceNotificationSubscription(final Class<? extends Notification> notification) {
101         for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
102             try {
103                 listener.getInstance().onNotificationSubscribtion(notification);
104             } catch (Exception e) {
105                 LOG.warn("Listener {} reported unexpected error on notification {}",
106                         listener.getInstance(), notification, e);
107             }
108         }
109     }
110
111     @Override
112     public Registration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
113         final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
114         for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
115             listeners.put(notifyType, invoker.getInvocationProxy());
116             announceNotificationSubscription(notifyType);
117         }
118
119         return new GeneratedListenerRegistration(listener, invoker, this);
120     }
121
122     protected boolean unregisterListener(final GenericNotificationRegistration<?> reg) {
123         return listeners.remove(reg.getType(), reg.getInstance());
124     }
125
126     protected void unregisterListener(final GeneratedListenerRegistration reg) {
127         final NotificationInvoker invoker = reg.getInvoker();
128         for (final Class<? extends Notification> notifyType : invoker.getSupportedNotifications()) {
129             this.listeners.remove(notifyType, invoker.getInvocationProxy());
130         }
131     }
132
133     @Override
134     public void close() {
135     }
136
137     @Override
138     public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
139         final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
140         for (final Class<? extends Notification> notification : listeners.keySet()) {
141             interestListener.onNotificationSubscribtion(notification);
142         }
143         return registration;
144     }
145 }