9efd48eabdb9f6889bc45551f38247f37cf9d048
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / NotificationBrokerImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl;
9
10 import java.util.Collection;
11 import java.util.Collections;
12 import java.util.Set;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Future;
15
16 import org.eclipse.xtext.xbase.lib.Conversions;
17 import org.eclipse.xtext.xbase.lib.Functions.Function1;
18 import org.eclipse.xtext.xbase.lib.IterableExtensions;
19 import org.opendaylight.controller.sal.binding.api.NotificationListener;
20 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
21 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
22 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
23 import org.opendaylight.yangtools.concepts.ListenerRegistration;
24 import org.opendaylight.yangtools.concepts.Registration;
25 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
26 import org.opendaylight.yangtools.yang.binding.Notification;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import com.google.common.base.Objects;
31 import com.google.common.base.Preconditions;
32 import com.google.common.collect.HashMultimap;
33 import com.google.common.collect.ImmutableSet;
34 import com.google.common.collect.ImmutableSet.Builder;
35 import com.google.common.collect.Iterables;
36 import com.google.common.collect.Multimap;
37 import com.google.common.collect.Multimaps;
38 import com.google.common.collect.SetMultimap;
39
40 public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
41     private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
42
43     private final ListenerRegistry<NotificationInterestListener> interestListeners =
44             ListenerRegistry.create();
45
46     private final Multimap<Class<? extends Notification>,NotificationListener<? extends Object>> listeners;
47     private ExecutorService executor;
48
49     public NotificationBrokerImpl() {
50         HashMultimap<Class<? extends Notification>,NotificationListener<? extends Object>> _create = HashMultimap.<Class<? extends Notification>, NotificationListener<? extends Object>>create();
51         SetMultimap<Class<? extends Notification>,NotificationListener<? extends Object>> _synchronizedSetMultimap = Multimaps.<Class<? extends Notification>, NotificationListener<? extends Object>>synchronizedSetMultimap(_create);
52         this.listeners = _synchronizedSetMultimap;
53     }
54
55     @Deprecated
56     public NotificationBrokerImpl(final ExecutorService executor) {
57         HashMultimap<Class<? extends Notification>,NotificationListener<? extends Object>> _create = HashMultimap.<Class<? extends Notification>, NotificationListener<? extends Object>>create();
58         SetMultimap<Class<? extends Notification>,NotificationListener<? extends Object>> _synchronizedSetMultimap = Multimaps.<Class<? extends Notification>, NotificationListener<? extends Object>>synchronizedSetMultimap(_create);
59         this.listeners = _synchronizedSetMultimap;
60         this.setExecutor(executor);
61     }
62
63     public void setExecutor(final ExecutorService executor) {
64         this.executor = Preconditions.checkNotNull(executor);
65     }
66
67     public Iterable<Class<? extends Object>> getNotificationTypes(final Notification notification) {
68         Class<? extends Notification> _class = notification.getClass();
69         Class<? extends Object>[] _interfaces = _class.getInterfaces();
70         final Function1<Class<? extends Object>,Boolean> _function = new Function1<Class<? extends Object>,Boolean>() {
71             @Override
72             public Boolean apply(final Class<? extends Object> it) {
73                 boolean _and = false;
74                 boolean _notEquals = (!Objects.equal(it, Notification.class));
75                 if (!_notEquals) {
76                     _and = false;
77                 } else {
78                     boolean _isAssignableFrom = Notification.class.isAssignableFrom(it);
79                     _and = (_notEquals && _isAssignableFrom);
80                 }
81                 return Boolean.valueOf(_and);
82             }
83         };
84         Iterable<Class<? extends Object>> _filter = IterableExtensions.<Class<? extends Object>>filter(((Iterable<Class<? extends Object>>)Conversions.doWrapArray(_interfaces)), _function);
85         return _filter;
86     }
87
88     @Override
89     public void publish(final Notification notification) {
90         this.publish(notification, executor);
91     }
92
93     @Override
94     public void publish(final Notification notification, final ExecutorService service) {
95         final Iterable<Class<? extends Object>> allTypes = this.getNotificationTypes(notification);
96         Iterable<NotificationListener<? extends Object>> listenerToNotify = Collections.<NotificationListener<? extends Object>>emptySet();
97         for (final Class<? extends Object> type : allTypes) {
98             Collection<NotificationListener<? extends Object>> _get = this.listeners.get(((Class<? extends Notification>) type));
99             Iterable<NotificationListener<? extends Object>> _plus = Iterables.<NotificationListener<? extends Object>>concat(listenerToNotify, _get);
100             listenerToNotify = _plus;
101         }
102         final Function1<NotificationListener<? extends Object>,NotifyTask> _function = new Function1<NotificationListener<? extends Object>,NotifyTask>() {
103             @Override
104             public NotifyTask apply(final NotificationListener<? extends Object> it) {
105                 NotifyTask _notifyTask = new NotifyTask(it, notification);
106                 return _notifyTask;
107             }
108         };
109         Iterable<NotifyTask> _map = IterableExtensions.<NotificationListener<? extends Object>, NotifyTask>map(listenerToNotify, _function);
110         final Set<NotifyTask> tasks = IterableExtensions.<NotifyTask>toSet(_map);
111         this.submitAll(executor, tasks);
112     }
113
114     public ImmutableSet<Future<Object>> submitAll(final ExecutorService service, final Set<NotifyTask> tasks) {
115         final Builder<Future<Object>> ret = ImmutableSet.<Future<Object>>builder();
116         for (final NotifyTask task : tasks) {
117             Future<Object> _submit = service.<Object>submit(task);
118             ret.add(_submit);
119         }
120         return ret.build();
121     }
122
123     @Override
124     public <T extends Notification> Registration<NotificationListener<T>> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
125         GenericNotificationRegistration<T> _genericNotificationRegistration = new GenericNotificationRegistration<T>(notificationType, listener, this);
126         final GenericNotificationRegistration<T> reg = _genericNotificationRegistration;
127         this.listeners.put(notificationType, listener);
128         this.announceNotificationSubscription(notificationType);
129         return reg;
130     }
131
132     private void announceNotificationSubscription(final Class<? extends Notification> notification) {
133         for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
134             try {
135                 listener.getInstance().onNotificationSubscribtion(notification);
136             } catch (Exception e) {
137                 LOG.warn("Listener {} reported unexpected error on notification {}",
138                         listener.getInstance(), notification, e);
139             }
140         }
141     }
142
143     @Override
144     public Registration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
145         final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
146         Set<Class<? extends Notification>> _supportedNotifications = invoker.getSupportedNotifications();
147         for (final Class<? extends Notification> notifyType : _supportedNotifications) {
148             {
149                 NotificationListener<Notification> _invocationProxy = invoker.getInvocationProxy();
150                 this.listeners.put(notifyType, _invocationProxy);
151                 this.announceNotificationSubscription(notifyType);
152             }
153         }
154         GeneratedListenerRegistration _generatedListenerRegistration = new GeneratedListenerRegistration(listener, invoker, this);
155         final GeneratedListenerRegistration registration = _generatedListenerRegistration;
156         return (registration);
157     }
158
159     protected boolean unregisterListener(final GenericNotificationRegistration<? extends Object> reg) {
160         Class<? extends Notification> _type = reg.getType();
161         NotificationListener<? extends Notification> _instance = reg.getInstance();
162         boolean _remove = this.listeners.remove(_type, _instance);
163         return _remove;
164     }
165
166     protected void unregisterListener(final GeneratedListenerRegistration reg) {
167         NotificationInvoker _invoker = reg.getInvoker();
168         Set<Class<? extends Notification>> _supportedNotifications = _invoker.getSupportedNotifications();
169         for (final Class<? extends Notification> notifyType : _supportedNotifications) {
170             NotificationInvoker _invoker_1 = reg.getInvoker();
171             NotificationListener<Notification> _invocationProxy = _invoker_1.getInvocationProxy();
172             this.listeners.remove(notifyType, _invocationProxy);
173         }
174     }
175
176     @Override
177     public void close() {
178     }
179
180     @Override
181     public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
182         final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
183         Set<Class<? extends Notification>> _keySet = this.listeners.keySet();
184         for (final Class<? extends Notification> notification : _keySet) {
185             interestListener.onNotificationSubscribtion(notification);
186         }
187         return registration;
188     }
189 }