BUG-1120: optimize notification delivery fast path
[controller.git] / opendaylight / md-sal / sal-binding-broker / src / main / java / org / opendaylight / controller / sal / binding / impl / NotificationBrokerImpl.java
1 /**
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.sal.binding.impl;
9
10 import java.util.Set;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.atomic.AtomicReference;
13
14 import javax.annotation.concurrent.GuardedBy;
15
16 import org.opendaylight.controller.sal.binding.api.NotificationListener;
17 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
18 import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
19 import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
20 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
21 import org.opendaylight.yangtools.concepts.ListenerRegistration;
22 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
23 import org.opendaylight.yangtools.yang.binding.Notification;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import com.google.common.base.Preconditions;
28 import com.google.common.collect.HashMultimap;
29 import com.google.common.collect.Multimap;
30
31 public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
32     private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
33
34     private final ListenerRegistry<NotificationInterestListener> interestListeners =
35             ListenerRegistry.create();
36     private final AtomicReference<ListenerMapGeneration> listeners = new AtomicReference<>(new ListenerMapGeneration());
37     private final ExecutorService executor;
38
39     public NotificationBrokerImpl(final ExecutorService executor) {
40         this.executor = Preconditions.checkNotNull(executor);
41     }
42
43     @Override
44     public void publish(final Notification notification) {
45         publish(notification, executor);
46     }
47
48     @Override
49     public void publish(final Notification notification, final ExecutorService service) {
50         for (NotificationListenerRegistration<?> r : listeners.get().listenersFor(notification)) {
51             service.submit(new NotifyTask(r, notification));
52         }
53     }
54
55     @GuardedBy("this")
56     private Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> mutableListeners() {
57         return HashMultimap.create(listeners.get().getListeners());
58     }
59
60     private final void addRegistrations(final NotificationListenerRegistration<?>... registrations) {
61         synchronized (this) {
62             final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
63                     mutableListeners();
64             for (NotificationListenerRegistration<?> reg : registrations) {
65                 newListeners.put(reg.getType(), reg);
66             }
67
68             listeners.set(new ListenerMapGeneration(newListeners));
69         }
70
71         // Notifications are dispatched out of lock...
72         for (NotificationListenerRegistration<?> reg : registrations) {
73             announceNotificationSubscription(reg.getType());
74         }
75     }
76
77     private synchronized void removeRegistrations(final NotificationListenerRegistration<?>... registrations) {
78         final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
79                 mutableListeners();
80
81         for (NotificationListenerRegistration<?> reg : registrations) {
82             newListeners.remove(reg.getType(), reg);
83         }
84
85         listeners.set(new ListenerMapGeneration(newListeners));
86     }
87
88     private void announceNotificationSubscription(final Class<? extends Notification> notification) {
89         for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
90             try {
91                 listener.getInstance().onNotificationSubscribtion(notification);
92             } catch (Exception e) {
93                 LOG.warn("Listener {} reported unexpected error on notification {}",
94                         listener.getInstance(), notification, e);
95             }
96         }
97     }
98
99     @Override
100     public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
101         final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
102
103         for (final Class<? extends Notification> notification : listeners.get().getKnownTypes()) {
104             interestListener.onNotificationSubscribtion(notification);
105         }
106         return registration;
107     }
108
109     @Override
110     public <T extends Notification> NotificationListenerRegistration<T> registerNotificationListener(final Class<T> notificationType, final NotificationListener<T> listener) {
111         final NotificationListenerRegistration<T> reg = new AbstractNotificationListenerRegistration<T>(notificationType, listener) {
112             @Override
113             protected void removeRegistration() {
114                 removeRegistrations(this);
115             }
116         };
117
118         addRegistrations(reg);
119         return reg;
120     }
121
122     @Override
123     public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(final org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
124         final NotificationInvoker invoker = SingletonHolder.INVOKER_FACTORY.invokerFor(listener);
125         final Set<Class<? extends Notification>> types = invoker.getSupportedNotifications();
126         final NotificationListenerRegistration<?>[] regs = new NotificationListenerRegistration<?>[types.size()];
127
128         // Populate the registrations...
129         int i = 0;
130         for (Class<? extends Notification> type : types) {
131             regs[i] = new AggregatedNotificationListenerRegistration<Notification, Object>(type, invoker.getInvocationProxy(), regs) {
132                 @Override
133                 protected void removeRegistration() {
134                     // Nothing to do, will be cleaned up by parent (below)
135                 }
136             };
137             ++i;
138         }
139
140         // ... now put them to use ...
141         addRegistrations(regs);
142
143         // ... finally return the parent registration
144         return new AbstractListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener>(listener) {
145             @Override
146             protected void removeRegistration() {
147                 removeRegistrations(regs);
148                 for (ListenerRegistration<?> reg : regs) {
149                     reg.close();
150                 }
151             }
152         };
153     }
154
155     @Override
156     public void close() {
157     }
158
159 }