Make sure listeners are immutable
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.ImmutableList;
12 import com.google.common.collect.ImmutableMultimap;
13 import com.google.common.collect.Multimaps;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import com.google.common.util.concurrent.ThreadFactoryBuilder;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.PreDestroy;
29 import javax.inject.Inject;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.opendaylight.mdsal.dom.api.DOMNotification;
32 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
33 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
35 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
36 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
37 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
38 import org.opendaylight.yangtools.concepts.AbstractRegistration;
39 import org.opendaylight.yangtools.concepts.ListenerRegistration;
40 import org.opendaylight.yangtools.concepts.Registration;
41 import org.opendaylight.yangtools.util.ListenerRegistry;
42 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
43 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
44 import org.opendaylight.yangtools.yang.common.Empty;
45 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
46 import org.osgi.service.component.annotations.Activate;
47 import org.osgi.service.component.annotations.Component;
48 import org.osgi.service.component.annotations.Deactivate;
49 import org.osgi.service.metatype.annotations.AttributeDefinition;
50 import org.osgi.service.metatype.annotations.Designate;
51 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
57  * routing of notifications from publishers to subscribers.
58  *
59  *<p>
60  * Internal implementation one by using a {@link QueuedNotificationManager}.
61  *</p>
62  */
63 @Component(immediate = true, configurationPid = "org.opendaylight.mdsal.dom.notification", service = {
64     DOMNotificationService.class, DOMNotificationPublishService.class,
65     DOMNotificationSubscriptionListenerRegistry.class
66 })
67 @Designate(ocd = DOMNotificationRouter.Config.class)
68 // Non-final for testing
69 public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
70         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
71     @ObjectClassDefinition()
72     public @interface Config {
73         @AttributeDefinition(name = "notification-queue-depth")
74         int queueDepth() default 65536;
75     }
76
77     @VisibleForTesting
78     abstract static sealed class Reg<T extends DOMNotificationListener> extends AbstractListenerRegistration<T> {
79         Reg(final @NonNull T listener) {
80             super(listener);
81         }
82     }
83
84     private final class SingleReg<T extends DOMNotificationListener> extends Reg<T> {
85         SingleReg(final @NonNull T listener) {
86             super(listener);
87         }
88
89         @Override
90         protected void removeRegistration() {
91             DOMNotificationRouter.this.removeRegistration(this);
92         }
93     }
94
95     private static final class ComponentReg extends Reg<DOMNotificationListener> {
96         ComponentReg(final @NonNull DOMNotificationListener listener) {
97             super(listener);
98         }
99
100         @Override
101         protected void removeRegistration() {
102             // No-op
103         }
104     }
105
106     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
107     private static final @NonNull ListenableFuture<?> NO_LISTENERS = Futures.immediateFuture(Empty.value());
108
109     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
110             ListenerRegistry.create();
111     private final EqualityQueuedNotificationManager<AbstractListenerRegistration<? extends DOMNotificationListener>,
112                 DOMNotificationRouterEvent> queueNotificationManager;
113     private final ScheduledThreadPoolExecutor observer;
114     private final ExecutorService executor;
115
116     private volatile ImmutableMultimap<Absolute, Reg<?>> listeners = ImmutableMultimap.of();
117
118     @Inject
119     public DOMNotificationRouter(final int maxQueueCapacity) {
120         observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
121             .setDaemon(true)
122             .setNameFormat("DOMNotificationRouter-observer-%d")
123             .build());
124         executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
125             .setDaemon(true)
126             .setNameFormat("DOMNotificationRouter-listeners-%d")
127             .build());
128         queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
129                 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
130         LOG.info("DOM Notification Router started");
131     }
132
133     @Activate
134     public DOMNotificationRouter(final Config config) {
135         this(config.queueDepth());
136     }
137
138     @Override
139     public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
140             final T listener, final Collection<Absolute> types) {
141         final var reg = new SingleReg<>(listener);
142
143         if (!types.isEmpty()) {
144             final var b = ImmutableMultimap.<Absolute, Reg<?>>builder();
145             b.putAll(listeners);
146
147             for (var t : types) {
148                 b.put(t, reg);
149             }
150
151             replaceListeners(b.build());
152         }
153
154         return reg;
155     }
156
157     @Override
158     public synchronized Registration registerNotificationListeners(
159             final Map<Absolute, DOMNotificationListener> typeToListener) {
160         final var b = ImmutableMultimap.<Absolute, Reg<?>>builder();
161         b.putAll(listeners);
162
163         final var tmp = new HashMap<DOMNotificationListener, ComponentReg>();
164         for (var e : typeToListener.entrySet()) {
165             b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new));
166         }
167         replaceListeners(b.build());
168
169         final var regs = List.copyOf(tmp.values());
170         return new AbstractRegistration() {
171             @Override
172             protected void removeRegistration() {
173                 regs.forEach(ComponentReg::close);
174                 removeRegistrations(regs);
175             }
176         };
177     }
178
179     private synchronized void removeRegistration(final SingleReg<?> reg) {
180         replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> input != reg)));
181     }
182
183     private synchronized void removeRegistrations(final List<ComponentReg> regs) {
184         replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> !regs.contains(input))));
185     }
186
187     /**
188      * Swaps registered listeners and triggers notification update.
189      *
190      * @param newListeners is used to notify listenerTypes changed
191      */
192     private void replaceListeners(final ImmutableMultimap<Absolute, Reg<?>> newListeners) {
193         listeners = newListeners;
194         notifyListenerTypesChanged(newListeners.keySet());
195     }
196
197     @SuppressWarnings("checkstyle:IllegalCatch")
198     private void notifyListenerTypesChanged(final Set<Absolute> typesAfter) {
199         final var listenersAfter = subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
200         executor.execute(() -> {
201             for (var subListener : listenersAfter) {
202                 try {
203                     subListener.onSubscriptionChanged(typesAfter);
204                 } catch (final Exception e) {
205                     LOG.warn("Uncaught exception during invoking listener {}", subListener, e);
206                 }
207             }
208         });
209     }
210
211     @Override
212     public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
213             final L listener) {
214         final var initialTypes = listeners.keySet();
215         executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
216         return subscriptionListeners.register(listener);
217     }
218
219     @VisibleForTesting
220     @NonNull ListenableFuture<?> publish(final DOMNotification notification, final Collection<Reg<?>> subscribers) {
221         final var futures = new ArrayList<ListenableFuture<?>>(subscribers.size());
222         subscribers.forEach(subscriber -> {
223             final var event = new DOMNotificationRouterEvent(notification);
224             futures.add(event.future());
225             queueNotificationManager.submitNotification(subscriber, event);
226         });
227         return Futures.transform(Futures.successfulAsList(futures), ignored -> Empty.value(),
228             MoreExecutors.directExecutor());
229     }
230
231     @Override
232     public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
233             throws InterruptedException {
234         final var subscribers = listeners.get(notification.getType());
235         return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
236     }
237
238     @Override
239     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
240         final var subscribers = listeners.get(notification.getType());
241         return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
242     }
243
244     @Override
245     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
246             final TimeUnit unit) throws InterruptedException {
247         final var subscribers = listeners.get(notification.getType());
248         if (subscribers.isEmpty()) {
249             return NO_LISTENERS;
250         }
251         // Attempt to perform a non-blocking publish first
252         final var noBlock = publish(notification, subscribers);
253         if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
254             return noBlock;
255         }
256
257         try {
258             final var publishThread = Thread.currentThread();
259             final var timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
260             final var withBlock = putNotification(notification);
261             timerTask.cancel(true);
262             if (observer.getQueue().size() > 50) {
263                 observer.purge();
264             }
265             return withBlock;
266         } catch (InterruptedException e) {
267             return DOMNotificationPublishService.REJECTED;
268         }
269     }
270
271     @PreDestroy
272     @Deactivate
273     @Override
274     public void close() {
275         observer.shutdown();
276         executor.shutdown();
277         LOG.info("DOM Notification Router stopped");
278     }
279
280     @VisibleForTesting
281     ExecutorService executor() {
282         return executor;
283     }
284
285     @VisibleForTesting
286     ExecutorService observer() {
287         return observer;
288     }
289
290     @VisibleForTesting
291     ImmutableMultimap<Absolute, ?> listeners() {
292         return listeners;
293     }
294
295     @VisibleForTesting
296     ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners() {
297         return subscriptionListeners;
298     }
299
300     private static void deliverEvents(final AbstractListenerRegistration<? extends DOMNotificationListener> reg,
301             final ImmutableList<DOMNotificationRouterEvent> events) {
302         if (reg.notClosed()) {
303             final var listener = reg.getInstance();
304             for (var event : events) {
305                 event.deliverTo(listener);
306             }
307         } else {
308             events.forEach(DOMNotificationRouterEvent::clear);
309         }
310     }
311 }