2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.binding.impl;
10 import java.util.Arrays;
11 import java.util.Collection;
12 import java.util.HashSet;
14 import java.util.concurrent.ExecutorService;
16 import org.opendaylight.controller.sal.binding.api.NotificationListener;
17 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
18 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
19 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
20 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
23 import org.opendaylight.yangtools.yang.binding.Notification;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Predicate;
29 import com.google.common.collect.HashMultimap;
30 import com.google.common.collect.Iterables;
31 import com.google.common.collect.Multimap;
32 import com.google.common.collect.Multimaps;
34 public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
35 private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
37 private final ListenerRegistry<NotificationInterestListener> interestListeners =
38 ListenerRegistry.create();
40 private final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> listeners =
41 Multimaps.synchronizedSetMultimap(HashMultimap.<Class<? extends Notification>, NotificationListenerRegistration<?>>create());
42 private ExecutorService executor;
45 public NotificationBrokerImpl(final ExecutorService executor) {
46 this.setExecutor(executor);
49 public void setExecutor(final ExecutorService executor) {
50 this.executor = Preconditions.checkNotNull(executor);
53 public Iterable<Class<?>> getNotificationTypes(final Notification notification) {
54 final Class<?>[] ifaces = notification.getClass().getInterfaces();
55 return Iterables.filter(Arrays.asList(ifaces), new Predicate<Class<?>>() {
57 public boolean apply(final Class<?> input) {
58 if (Notification.class.equals(input)) {
61 return Notification.class.isAssignableFrom(input);
67 public void publish(final Notification notification) {
68 this.publish(notification, executor);
72 public void publish(final Notification notification, final ExecutorService service) {
73 final Set<NotificationListenerRegistration<?>> toNotify = new HashSet<>();
75 for (final Class<?> type : getNotificationTypes(notification)) {
76 final Collection<NotificationListenerRegistration<?>> l = listeners.get((Class<? extends Notification>) type);
82 for (NotificationListenerRegistration<?> r : toNotify) {
83 service.submit(new NotifyTask(r, notification));
87 private void addRegistrations(final NotificationListenerRegistration<?>... registrations) {
88 for (NotificationListenerRegistration<?> reg : registrations) {
89 listeners.put(reg.getType(), reg);
90 this.announceNotificationSubscription(reg.getType());
94 void removeRegistrations(final NotificationListenerRegistration<?>... registrations) {
95 for (NotificationListenerRegistration<?> reg : registrations) {
96 listeners.remove(reg.getType(), reg);
101 public <T extends Notification> NotificationListenerRegistration<T> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
102 final NotificationListenerRegistration<T> reg = new AbstractNotificationListenerRegistration<T>(notificationType, listener) {
104 protected void removeRegistration() {
105 removeRegistrations(this);
109 addRegistrations(reg);
113 private void announceNotificationSubscription(final Class<? extends Notification> notification) {
114 for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
116 listener.getInstance().onNotificationSubscribtion(notification);
117 } catch (Exception e) {
118 LOG.warn("Listener {} reported unexpected error on notification {}",
119 listener.getInstance(), notification, e);
125 public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
126 final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
127 final Set<Class<? extends Notification>> types = invoker.getSupportedNotifications();
128 final NotificationListenerRegistration<?>[] regs = new NotificationListenerRegistration<?>[types.size()];
130 // Populate the registrations...
132 for (Class<? extends Notification> type : types) {
133 regs[i] = new AggregatedNotificationListenerRegistration<Notification, Object>(type, invoker.getInvocationProxy(), regs) {
135 protected void removeRegistration() {
136 // Nothing to do, will be cleaned up by parent (below)
142 // ... now put them to use ...
143 addRegistrations(regs);
145 // ... finally return the parent registration
146 return new AbstractListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener>(listener) {
148 protected void removeRegistration() {
149 removeRegistrations(regs);
150 for (ListenerRegistration<?> reg : regs) {
158 public void close() {
162 public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
163 final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
164 for (final Class<? extends Notification> notification : listeners.keySet()) {
165 interestListener.onNotificationSubscribtion(notification);