7d844b3bf578c1958b3849e4b4d6841b2fd24ddb
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / NotificationBrokerImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl;
9
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.HashSet;
13 import java.util.Set;
14 import java.util.concurrent.ExecutorService;
15
16 import org.opendaylight.controller.sal.binding.api.NotificationListener;
17 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
18 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
19 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
20 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
23 import org.opendaylight.yangtools.yang.binding.Notification;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Predicate;
29 import com.google.common.collect.HashMultimap;
30 import com.google.common.collect.Iterables;
31 import com.google.common.collect.Multimap;
32 import com.google.common.collect.Multimaps;
33
34 public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
35     private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
36
37     private final ListenerRegistry<NotificationInterestListener> interestListeners =
38             ListenerRegistry.create();
39
40     private final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> listeners =
41             Multimaps.synchronizedSetMultimap(HashMultimap.<Class<? extends Notification>, NotificationListenerRegistration<?>>create());
42     private ExecutorService executor;
43
44     @Deprecated
45     public NotificationBrokerImpl(final ExecutorService executor) {
46         this.setExecutor(executor);
47     }
48
49     public void setExecutor(final ExecutorService executor) {
50         this.executor = Preconditions.checkNotNull(executor);
51     }
52
53     public Iterable<Class<?>> getNotificationTypes(final Notification notification) {
54         final Class<?>[] ifaces = notification.getClass().getInterfaces();
55         return Iterables.filter(Arrays.asList(ifaces), new Predicate<Class<?>>() {
56             @Override
57             public boolean apply(final Class<?> input) {
58                 if (Notification.class.equals(input)) {
59                     return false;
60                 }
61                 return Notification.class.isAssignableFrom(input);
62             }
63         });
64     }
65
66     @Override
67     public void publish(final Notification notification) {
68         this.publish(notification, executor);
69     }
70
71     @Override
72     public void publish(final Notification notification, final ExecutorService service) {
73         final Set<NotificationListenerRegistration<?>> toNotify = new HashSet<>();
74
75         for (final Class<?> type : getNotificationTypes(notification)) {
76             final Collection<NotificationListenerRegistration<?>> l = listeners.get((Class<? extends Notification>) type);
77             if (l != null) {
78                 toNotify.addAll(l);
79             }
80         }
81
82         for (NotificationListenerRegistration<?> r : toNotify) {
83             service.submit(new NotifyTask(r, notification));
84         }
85     }
86
87     private void addRegistrations(final NotificationListenerRegistration<?>... registrations) {
88         for (NotificationListenerRegistration<?> reg : registrations) {
89             listeners.put(reg.getType(), reg);
90             this.announceNotificationSubscription(reg.getType());
91         }
92     }
93
94     void removeRegistrations(final NotificationListenerRegistration<?>... registrations) {
95         for (NotificationListenerRegistration<?> reg : registrations) {
96             listeners.remove(reg.getType(), reg);
97         }
98     }
99
100     @Override
101     public <T extends Notification> NotificationListenerRegistration<T> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
102         final NotificationListenerRegistration<T> reg = new AbstractNotificationListenerRegistration<T>(notificationType, listener) {
103             @Override
104             protected void removeRegistration() {
105                 removeRegistrations(this);
106             }
107         };
108
109         addRegistrations(reg);
110         return reg;
111     }
112
113     private void announceNotificationSubscription(final Class<? extends Notification> notification) {
114         for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
115             try {
116                 listener.getInstance().onNotificationSubscribtion(notification);
117             } catch (Exception e) {
118                 LOG.warn("Listener {} reported unexpected error on notification {}",
119                         listener.getInstance(), notification, e);
120             }
121         }
122     }
123
124     @Override
125     public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
126         final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
127         final Set<Class<? extends Notification>> types = invoker.getSupportedNotifications();
128         final NotificationListenerRegistration<?>[] regs = new NotificationListenerRegistration<?>[types.size()];
129
130         // Populate the registrations...
131         int i = 0;
132         for (Class<? extends Notification> type : types) {
133             regs[i] = new AggregatedNotificationListenerRegistration<Notification, Object>(type, invoker.getInvocationProxy(), regs) {
134                 @Override
135                 protected void removeRegistration() {
136                     // Nothing to do, will be cleaned up by parent (below)
137                 }
138             };
139             ++i;
140         }
141
142         // ... now put them to use ...
143         addRegistrations(regs);
144
145         // ... finally return the parent registration
146         return new AbstractListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener>(listener) {
147             @Override
148             protected void removeRegistration() {
149                 removeRegistrations(regs);
150                 for (ListenerRegistration<?> reg : regs) {
151                     reg.close();
152                 }
153             }
154         };
155     }
156
157     @Override
158     public void close() {
159     }
160
161     @Override
162     public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
163         final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
164         for (final Class<? extends Notification> notification : listeners.keySet()) {
165             interestListener.onNotificationSubscribtion(notification);
166         }
167         return registration;
168     }
169 }