5e739888307dd06d025c7b09fba27daa1fd47821
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.ImmutableList;
12 import com.google.common.collect.ImmutableMultimap;
13 import com.google.common.collect.Multimap;
14 import com.google.common.collect.Multimaps;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledFuture;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import org.eclipse.jdt.annotation.NonNull;
33 import org.opendaylight.mdsal.dom.api.DOMNotification;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
35 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
36 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
37 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
38 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
39 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
40 import org.opendaylight.yangtools.concepts.AbstractRegistration;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.concepts.Registration;
43 import org.opendaylight.yangtools.util.ListenerRegistry;
44 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
45 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
46 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
47 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
48 import org.osgi.service.component.annotations.Activate;
49 import org.osgi.service.component.annotations.Component;
50 import org.osgi.service.component.annotations.Deactivate;
51 import org.osgi.service.metatype.annotations.AttributeDefinition;
52 import org.osgi.service.metatype.annotations.Designate;
53 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /**
58  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
59  * routing of notifications from publishers to subscribers.
60  *
61  *<p>
62  * Internal implementation one by using a {@link QueuedNotificationManager}.
63  *</p>
64  */
65 @Component(immediate = true, configurationPid = "org.opendaylight.mdsal.dom.notification", service = {
66     DOMNotificationService.class, DOMNotificationPublishService.class,
67     DOMNotificationSubscriptionListenerRegistry.class
68 })
69 @Designate(ocd = DOMNotificationRouter.Config.class)
70 // Non-final for testing
71 public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
72         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
73     @ObjectClassDefinition()
74     public @interface Config {
75         @AttributeDefinition(name = "notification-queue-depth")
76         int queueDepth() default 65536;
77     }
78
79     @VisibleForTesting
80     abstract static sealed class Reg<T extends DOMNotificationListener> extends AbstractListenerRegistration<T> {
81         Reg(final @NonNull T listener) {
82             super(listener);
83         }
84     }
85
86     private final class SingleReg<T extends DOMNotificationListener> extends Reg<T> {
87         SingleReg(final @NonNull T listener) {
88             super(listener);
89         }
90
91         @Override
92         protected void removeRegistration() {
93             DOMNotificationRouter.this.removeRegistration(this);
94         }
95     }
96
97     private static final class ComponentReg extends Reg<DOMNotificationListener> {
98         ComponentReg(final @NonNull DOMNotificationListener listener) {
99             super(listener);
100         }
101
102         @Override
103         protected void removeRegistration() {
104             // No-op
105         }
106     }
107
108     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
109     private static final ListenableFuture<Void> NO_LISTENERS = FluentFutures.immediateNullFluentFuture();
110
111     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
112             ListenerRegistry.create();
113     private final EqualityQueuedNotificationManager<AbstractListenerRegistration<? extends DOMNotificationListener>,
114                 DOMNotificationRouterEvent> queueNotificationManager;
115     private final ScheduledThreadPoolExecutor observer;
116     private final ExecutorService executor;
117
118     private volatile Multimap<Absolute, Reg<?>> listeners = ImmutableMultimap.of();
119
120     @Inject
121     public DOMNotificationRouter(final int maxQueueCapacity) {
122         observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
123             .setDaemon(true)
124             .setNameFormat("DOMNotificationRouter-observer-%d")
125             .build());
126         executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
127             .setDaemon(true)
128             .setNameFormat("DOMNotificationRouter-listeners-%d")
129             .build());
130         queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
131                 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
132     }
133
134     @Activate
135     public DOMNotificationRouter(final Config config) {
136         this(config.queueDepth());
137         LOG.info("DOM Notification Router started");
138     }
139
140     @Deprecated(forRemoval = true)
141     public static DOMNotificationRouter create(final int maxQueueCapacity) {
142         return new DOMNotificationRouter(maxQueueCapacity);
143     }
144
145     @Override
146     public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
147             final T listener, final Collection<Absolute> types) {
148         final var reg = new SingleReg<>(listener);
149
150         if (!types.isEmpty()) {
151             final var b = ImmutableMultimap.<Absolute, Reg<?>>builder();
152             b.putAll(listeners);
153
154             for (var t : types) {
155                 b.put(t, reg);
156             }
157
158             replaceListeners(b.build());
159         }
160
161         return reg;
162     }
163
164     @Override
165     public synchronized Registration registerNotificationListeners(
166             final Map<Absolute, DOMNotificationListener> typeToListener) {
167         final var b = ImmutableMultimap.<Absolute, Reg<?>>builder();
168         b.putAll(listeners);
169
170         final var tmp = new HashMap<DOMNotificationListener, ComponentReg>();
171         for (var e : typeToListener.entrySet()) {
172             b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new));
173         }
174
175         final var regs = List.copyOf(tmp.values());
176         return new AbstractRegistration() {
177             @Override
178             protected void removeRegistration() {
179                 regs.forEach(ComponentReg::close);
180                 removeRegistrations(regs);
181             }
182         };
183     }
184
185     private synchronized void removeRegistration(final SingleReg<?> reg) {
186         replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> input != reg)));
187     }
188
189     private synchronized void removeRegistrations(final List<ComponentReg> regs) {
190         replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> !regs.contains(input))));
191     }
192
193     /**
194      * Swaps registered listeners and triggers notification update.
195      *
196      * @param newListeners is used to notify listenerTypes changed
197      */
198     private void replaceListeners(final Multimap<Absolute, Reg<?>> newListeners) {
199         listeners = newListeners;
200         notifyListenerTypesChanged(newListeners.keySet());
201     }
202
203     @SuppressWarnings("checkstyle:IllegalCatch")
204     private void notifyListenerTypesChanged(final Set<Absolute> typesAfter) {
205         final List<? extends DOMNotificationSubscriptionListener> listenersAfter =
206                 subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
207         executor.execute(() -> {
208             for (final DOMNotificationSubscriptionListener subListener : listenersAfter) {
209                 try {
210                     subListener.onSubscriptionChanged(typesAfter);
211                 } catch (final Exception e) {
212                     LOG.warn("Uncaught exception during invoking listener {}", subListener, e);
213                 }
214             }
215         });
216     }
217
218     @Override
219     public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
220             final L listener) {
221         final Set<Absolute> initialTypes = listeners.keySet();
222         executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
223         return subscriptionListeners.register(listener);
224     }
225
226     @VisibleForTesting
227     ListenableFuture<? extends Object> publish(final DOMNotification notification,
228             final Collection<Reg<?>> subscribers) {
229         final List<ListenableFuture<Void>> futures = new ArrayList<>(subscribers.size());
230         subscribers.forEach(subscriber -> {
231             final DOMNotificationRouterEvent event = new DOMNotificationRouterEvent(notification);
232             futures.add(event.future());
233             queueNotificationManager.submitNotification(subscriber, event);
234         });
235         return Futures.transform(Futures.successfulAsList(futures), ignored -> (Void)null,
236             MoreExecutors.directExecutor());
237     }
238
239     @Override
240     public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
241             throws InterruptedException {
242         final var subscribers = listeners.get(notification.getType());
243         if (subscribers.isEmpty()) {
244             return NO_LISTENERS;
245         }
246
247         return publish(notification, subscribers);
248     }
249
250     @Override
251     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
252         final var subscribers = listeners.get(notification.getType());
253         if (subscribers.isEmpty()) {
254             return NO_LISTENERS;
255         }
256
257         return publish(notification, subscribers);
258     }
259
260     @Override
261     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
262             final TimeUnit unit) throws InterruptedException {
263         final var subscribers = listeners.get(notification.getType());
264         if (subscribers.isEmpty()) {
265             return NO_LISTENERS;
266         }
267         // Attempt to perform a non-blocking publish first
268         final ListenableFuture<?> noBlock = publish(notification, subscribers);
269         if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
270             return noBlock;
271         }
272
273         try {
274             final Thread publishThread = Thread.currentThread();
275             ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
276             final ListenableFuture<?> withBlock = putNotification(notification);
277             timerTask.cancel(true);
278             if (observer.getQueue().size() > 50) {
279                 observer.purge();
280             }
281             return withBlock;
282         } catch (InterruptedException e) {
283             return DOMNotificationPublishService.REJECTED;
284         }
285     }
286
287     @PreDestroy
288     @Deactivate
289     @Override
290     public void close() {
291         observer.shutdown();
292         executor.shutdown();
293         LOG.info("DOM Notification Router stopped");
294     }
295
296     @VisibleForTesting
297     ExecutorService executor() {
298         return executor;
299     }
300
301     @VisibleForTesting
302     ExecutorService observer() {
303         return observer;
304     }
305
306     @VisibleForTesting
307     Multimap<Absolute, ?> listeners() {
308         return listeners;
309     }
310
311     @VisibleForTesting
312     ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners() {
313         return subscriptionListeners;
314     }
315
316     private static void deliverEvents(final AbstractListenerRegistration<? extends DOMNotificationListener> reg,
317             final ImmutableList<DOMNotificationRouterEvent> events) {
318         if (reg.notClosed()) {
319             final DOMNotificationListener listener = reg.getInstance();
320             for (DOMNotificationRouterEvent event : events) {
321                 event.deliverTo(listener);
322             }
323         } else {
324             events.forEach(DOMNotificationRouterEvent::clear);
325         }
326     }
327 }