2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.binding.compat;
10 import com.google.common.base.Preconditions;
11 import com.google.common.collect.HashMultimap;
12 import com.google.common.collect.Multimap;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.atomic.AtomicReference;
16 import javax.annotation.concurrent.GuardedBy;
17 import org.opendaylight.controller.sal.binding.api.NotificationListener;
18 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
19 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
20 import org.opendaylight.yangtools.concepts.ListenerRegistration;
21 import org.opendaylight.yangtools.util.ListenerRegistry;
22 import org.opendaylight.yangtools.yang.binding.Notification;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 public class HydrogenNotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
28 private static final Logger LOG = LoggerFactory.getLogger(HydrogenNotificationBrokerImpl.class);
30 private final ListenerRegistry<NotificationInterestListener> interestListeners =
31 ListenerRegistry.create();
32 private final AtomicReference<ListenerMapGeneration> listeners = new AtomicReference<>(new ListenerMapGeneration());
33 private final ExecutorService executor;
35 public HydrogenNotificationBrokerImpl(final ExecutorService executor) {
36 this.executor = Preconditions.checkNotNull(executor);
40 public void publish(final Notification notification) {
41 publish(notification, executor);
45 public void publish(final Notification notification, final ExecutorService service) {
46 for (final NotificationListenerRegistration<?> r : listeners.get().listenersFor(notification)) {
47 service.submit(new NotifyTask(r, notification));
52 private Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> mutableListeners() {
53 return HashMultimap.create(listeners.get().getListeners());
56 private void addRegistrations(final NotificationListenerRegistration<?>... registrations) {
58 final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
60 for (final NotificationListenerRegistration<?> reg : registrations) {
61 newListeners.put(reg.getType(), reg);
64 listeners.set(new ListenerMapGeneration(newListeners));
67 // Notifications are dispatched out of lock...
68 for (final NotificationListenerRegistration<?> reg : registrations) {
69 announceNotificationSubscription(reg.getType());
73 private synchronized void removeRegistrations(final NotificationListenerRegistration<?>... registrations) {
74 final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
77 for (final NotificationListenerRegistration<?> reg : registrations) {
78 newListeners.remove(reg.getType(), reg);
81 listeners.set(new ListenerMapGeneration(newListeners));
84 private void announceNotificationSubscription(final Class<? extends Notification> notification) {
85 for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
87 listener.getInstance().onNotificationSubscribtion(notification);
88 } catch (final Exception e) {
89 LOG.warn("Listener {} reported unexpected error on notification {}",
90 listener.getInstance(), notification, e);
96 public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
97 final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
99 for (final Class<? extends Notification> notification : listeners.get().getKnownTypes()) {
100 interestListener.onNotificationSubscribtion(notification);
106 public <T extends Notification> NotificationListenerRegistration<T> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
107 final NotificationListenerRegistration<T> reg = new AbstractNotificationListenerRegistration<T>(notificationType, listener) {
109 protected void removeRegistration() {
110 removeRegistrations(this);
114 addRegistrations(reg);
119 public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
120 final NotificationInvoker invoker = NotificationInvoker.invokerFor(listener);
121 final Set<Class<? extends Notification>> types = invoker.getSupportedNotifications();
122 final NotificationListenerRegistration<?>[] regs = new NotificationListenerRegistration<?>[types.size()];
124 // Populate the registrations...
126 for (final Class<? extends Notification> type : types) {
127 regs[i] = new AggregatedNotificationListenerRegistration<Notification, Object>(type, invoker, regs) {
129 protected void removeRegistration() {
130 // Nothing to do, will be cleaned up by parent (below)
136 // ... now put them to use ...
137 addRegistrations(regs);
139 // ... finally return the parent registration
140 return new AbstractListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener>(listener) {
142 protected void removeRegistration() {
143 removeRegistrations(regs);
144 for (final ListenerRegistration<?> reg : regs) {
152 public void close() {