2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.sal.binding.impl;
11 import java.util.concurrent.ExecutorService;
13 import org.opendaylight.controller.sal.binding.api.NotificationListener;
14 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
15 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
16 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
17 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
18 import org.opendaylight.yangtools.concepts.ListenerRegistration;
19 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
20 import org.opendaylight.yangtools.yang.binding.Notification;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 import com.google.common.base.Preconditions;
26 public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
27 private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
29 private final ListenerRegistry<NotificationInterestListener> interestListeners =
30 ListenerRegistry.create();
31 private final NotificationListenerMap listeners = new NotificationListenerMap();
32 private final ExecutorService executor;
34 public NotificationBrokerImpl(final ExecutorService executor) {
35 this.executor = Preconditions.checkNotNull(executor);
39 public void publish(final Notification notification) {
40 publish(notification, executor);
44 public void publish(final Notification notification, final ExecutorService service) {
45 for (NotificationListenerRegistration<?> r : listeners.listenersFor(notification)) {
46 service.submit(new NotifyTask(r, notification));
50 private final void addRegistrations(final NotificationListenerRegistration<?>... registrations) {
51 listeners.addRegistrations(registrations);
52 for (NotificationListenerRegistration<?> reg : registrations) {
53 announceNotificationSubscription(reg.getType());
57 private void announceNotificationSubscription(final Class<? extends Notification> notification) {
58 for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
60 listener.getInstance().onNotificationSubscribtion(notification);
61 } catch (Exception e) {
62 LOG.warn("Listener {} reported unexpected error on notification {}",
63 listener.getInstance(), notification, e);
69 public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
70 final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
71 for (final Class<? extends Notification> notification : listeners.getKnownTypes()) {
72 interestListener.onNotificationSubscribtion(notification);
78 public <T extends Notification> NotificationListenerRegistration<T> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
79 final NotificationListenerRegistration<T> reg = new AbstractNotificationListenerRegistration<T>(notificationType, listener) {
81 protected void removeRegistration() {
82 listeners.removeRegistrations(this);
86 addRegistrations(reg);
91 public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
92 final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
93 final Set<Class<? extends Notification>> types = invoker.getSupportedNotifications();
94 final NotificationListenerRegistration<?>[] regs = new NotificationListenerRegistration<?>[types.size()];
96 // Populate the registrations...
98 for (Class<? extends Notification> type : types) {
99 regs[i] = new AggregatedNotificationListenerRegistration<Notification, Object>(type, invoker.getInvocationProxy(), regs) {
101 protected void removeRegistration() {
102 // Nothing to do, will be cleaned up by parent (below)
108 // ... now put them to use ...
109 addRegistrations(regs);
111 // ... finally return the parent registration
112 return new AbstractListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener>(listener) {
114 protected void removeRegistration() {
115 listeners.removeRegistrations(regs);
116 for (ListenerRegistration<?> reg : regs) {
124 public void close() {