bee565a48a94dd0dc26736ab3e4c9bab07e1e464
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.ImmutableList;
12 import com.google.common.collect.ImmutableMultimap;
13 import com.google.common.collect.Multimap;
14 import com.google.common.collect.Multimaps;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.opendaylight.mdsal.dom.api.DOMNotification;
33 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
35 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
36 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
37 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
38 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
39 import org.opendaylight.yangtools.concepts.AbstractRegistration;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.concepts.Registration;
42 import org.opendaylight.yangtools.util.ListenerRegistry;
43 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
44 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
45 import org.opendaylight.yangtools.yang.common.Empty;
46 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
47 import org.osgi.service.component.annotations.Activate;
48 import org.osgi.service.component.annotations.Component;
49 import org.osgi.service.component.annotations.Deactivate;
50 import org.osgi.service.metatype.annotations.AttributeDefinition;
51 import org.osgi.service.metatype.annotations.Designate;
52 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
58  * routing of notifications from publishers to subscribers.
59  *
60  *<p>
61  * Internal implementation one by using a {@link QueuedNotificationManager}.
62  *</p>
63  */
64 @Component(immediate = true, configurationPid = "org.opendaylight.mdsal.dom.notification", service = {
65     DOMNotificationService.class, DOMNotificationPublishService.class,
66     DOMNotificationSubscriptionListenerRegistry.class
67 })
68 @Designate(ocd = DOMNotificationRouter.Config.class)
69 // Non-final for testing
70 public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
71         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
72     @ObjectClassDefinition()
73     public @interface Config {
74         @AttributeDefinition(name = "notification-queue-depth")
75         int queueDepth() default 65536;
76     }
77
78     @VisibleForTesting
79     abstract static sealed class Reg<T extends DOMNotificationListener> extends AbstractListenerRegistration<T> {
80         Reg(final @NonNull T listener) {
81             super(listener);
82         }
83     }
84
85     private final class SingleReg<T extends DOMNotificationListener> extends Reg<T> {
86         SingleReg(final @NonNull T listener) {
87             super(listener);
88         }
89
90         @Override
91         protected void removeRegistration() {
92             DOMNotificationRouter.this.removeRegistration(this);
93         }
94     }
95
96     private static final class ComponentReg extends Reg<DOMNotificationListener> {
97         ComponentReg(final @NonNull DOMNotificationListener listener) {
98             super(listener);
99         }
100
101         @Override
102         protected void removeRegistration() {
103             // No-op
104         }
105     }
106
107     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
108     private static final @NonNull ListenableFuture<?> NO_LISTENERS = Futures.immediateFuture(Empty.value());
109
110     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
111             ListenerRegistry.create();
112     private final EqualityQueuedNotificationManager<AbstractListenerRegistration<? extends DOMNotificationListener>,
113                 DOMNotificationRouterEvent> queueNotificationManager;
114     private final ScheduledThreadPoolExecutor observer;
115     private final ExecutorService executor;
116
117     private volatile Multimap<Absolute, Reg<?>> listeners = ImmutableMultimap.of();
118
119     @Inject
120     public DOMNotificationRouter(final int maxQueueCapacity) {
121         observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
122             .setDaemon(true)
123             .setNameFormat("DOMNotificationRouter-observer-%d")
124             .build());
125         executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
126             .setDaemon(true)
127             .setNameFormat("DOMNotificationRouter-listeners-%d")
128             .build());
129         queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
130                 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
131         LOG.info("DOM Notification Router started");
132     }
133
134     @Activate
135     public DOMNotificationRouter(final Config config) {
136         this(config.queueDepth());
137     }
138
139     @Override
140     public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
141             final T listener, final Collection<Absolute> types) {
142         final var reg = new SingleReg<>(listener);
143
144         if (!types.isEmpty()) {
145             final var b = ImmutableMultimap.<Absolute, Reg<?>>builder();
146             b.putAll(listeners);
147
148             for (var t : types) {
149                 b.put(t, reg);
150             }
151
152             replaceListeners(b.build());
153         }
154
155         return reg;
156     }
157
158     @Override
159     public synchronized Registration registerNotificationListeners(
160             final Map<Absolute, DOMNotificationListener> typeToListener) {
161         final var b = ImmutableMultimap.<Absolute, Reg<?>>builder();
162         b.putAll(listeners);
163
164         final var tmp = new HashMap<DOMNotificationListener, ComponentReg>();
165         for (var e : typeToListener.entrySet()) {
166             b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new));
167         }
168         replaceListeners(b.build());
169
170         final var regs = List.copyOf(tmp.values());
171         return new AbstractRegistration() {
172             @Override
173             protected void removeRegistration() {
174                 regs.forEach(ComponentReg::close);
175                 removeRegistrations(regs);
176             }
177         };
178     }
179
180     private synchronized void removeRegistration(final SingleReg<?> reg) {
181         replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> input != reg)));
182     }
183
184     private synchronized void removeRegistrations(final List<ComponentReg> regs) {
185         replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> !regs.contains(input))));
186     }
187
188     /**
189      * Swaps registered listeners and triggers notification update.
190      *
191      * @param newListeners is used to notify listenerTypes changed
192      */
193     private void replaceListeners(final Multimap<Absolute, Reg<?>> newListeners) {
194         listeners = newListeners;
195         notifyListenerTypesChanged(newListeners.keySet());
196     }
197
198     @SuppressWarnings("checkstyle:IllegalCatch")
199     private void notifyListenerTypesChanged(final Set<Absolute> typesAfter) {
200         final var listenersAfter = subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
201         executor.execute(() -> {
202             for (var subListener : listenersAfter) {
203                 try {
204                     subListener.onSubscriptionChanged(typesAfter);
205                 } catch (final Exception e) {
206                     LOG.warn("Uncaught exception during invoking listener {}", subListener, e);
207                 }
208             }
209         });
210     }
211
212     @Override
213     public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
214             final L listener) {
215         final var initialTypes = listeners.keySet();
216         executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
217         return subscriptionListeners.register(listener);
218     }
219
220     @VisibleForTesting
221     @NonNull ListenableFuture<? extends Object> publish(final DOMNotification notification,
222             final Collection<Reg<?>> subscribers) {
223         final var futures = new ArrayList<ListenableFuture<Void>>(subscribers.size());
224         subscribers.forEach(subscriber -> {
225             final var event = new DOMNotificationRouterEvent(notification);
226             futures.add(event.future());
227             queueNotificationManager.submitNotification(subscriber, event);
228         });
229         return Futures.transform(Futures.successfulAsList(futures), ignored -> (Void)null,
230             MoreExecutors.directExecutor());
231     }
232
233     @Override
234     public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
235             throws InterruptedException {
236         final var subscribers = listeners.get(notification.getType());
237         return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
238     }
239
240     @Override
241     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
242         final var subscribers = listeners.get(notification.getType());
243         return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
244     }
245
246     @Override
247     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
248             final TimeUnit unit) throws InterruptedException {
249         final var subscribers = listeners.get(notification.getType());
250         if (subscribers.isEmpty()) {
251             return NO_LISTENERS;
252         }
253         // Attempt to perform a non-blocking publish first
254         final var noBlock = publish(notification, subscribers);
255         if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
256             return noBlock;
257         }
258
259         try {
260             final var publishThread = Thread.currentThread();
261             final var timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
262             final var withBlock = putNotification(notification);
263             timerTask.cancel(true);
264             if (observer.getQueue().size() > 50) {
265                 observer.purge();
266             }
267             return withBlock;
268         } catch (InterruptedException e) {
269             return DOMNotificationPublishService.REJECTED;
270         }
271     }
272
273     @PreDestroy
274     @Deactivate
275     @Override
276     public void close() {
277         observer.shutdown();
278         executor.shutdown();
279         LOG.info("DOM Notification Router stopped");
280     }
281
282     @VisibleForTesting
283     ExecutorService executor() {
284         return executor;
285     }
286
287     @VisibleForTesting
288     ExecutorService observer() {
289         return observer;
290     }
291
292     @VisibleForTesting
293     Multimap<Absolute, ?> listeners() {
294         return listeners;
295     }
296
297     @VisibleForTesting
298     ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners() {
299         return subscriptionListeners;
300     }
301
302     private static void deliverEvents(final AbstractListenerRegistration<? extends DOMNotificationListener> reg,
303             final ImmutableList<DOMNotificationRouterEvent> events) {
304         if (reg.notClosed()) {
305             final var listener = reg.getInstance();
306             for (var event : events) {
307                 event.deliverTo(listener);
308             }
309         } else {
310             events.forEach(DOMNotificationRouterEvent::clear);
311         }
312     }
313 }