6b5e956d4572c655cf139b3c94de94be912e66be
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.ImmutableList;
12 import com.google.common.collect.ImmutableMultimap;
13 import com.google.common.collect.ImmutableSet;
14 import com.google.common.collect.Multimaps;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.PreDestroy;
29 import javax.inject.Inject;
30 import javax.inject.Singleton;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.opendaylight.mdsal.dom.api.DOMNotification;
33 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension;
35 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension.DemandListener;
36 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
37 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
39 import org.opendaylight.yangtools.concepts.AbstractRegistration;
40 import org.opendaylight.yangtools.concepts.Registration;
41 import org.opendaylight.yangtools.util.ObjectRegistry;
42 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
43 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
44 import org.opendaylight.yangtools.yang.common.Empty;
45 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
46 import org.osgi.service.component.annotations.Activate;
47 import org.osgi.service.component.annotations.Component;
48 import org.osgi.service.component.annotations.Deactivate;
49 import org.osgi.service.metatype.annotations.AttributeDefinition;
50 import org.osgi.service.metatype.annotations.Designate;
51 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 /**
56  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
57  * routing of notifications from publishers to subscribers.
58  *
59  *<p>
60  * Internal implementation one by using a {@link QueuedNotificationManager}.
61  *</p>
62  */
63 @Singleton
64 @Component(configurationPid = "org.opendaylight.mdsal.dom.notification", service = DOMNotificationRouter.class)
65 @Designate(ocd = DOMNotificationRouter.Config.class)
66 // Non-final for testing
67 public class DOMNotificationRouter implements AutoCloseable {
68     @ObjectClassDefinition()
69     public @interface Config {
70         @AttributeDefinition(name = "notification-queue-depth")
71         int queueDepth() default 65536;
72     }
73
74     @VisibleForTesting
75     abstract static sealed class Reg extends AbstractObjectRegistration<DOMNotificationListener> {
76         Reg(final @NonNull DOMNotificationListener listener) {
77             super(listener);
78         }
79     }
80
81     private final class SingleReg extends Reg {
82         SingleReg(final @NonNull DOMNotificationListener listener) {
83             super(listener);
84         }
85
86         @Override
87         protected void removeRegistration() {
88             DOMNotificationRouter.this.removeRegistration(this);
89         }
90     }
91
92     private static final class ComponentReg extends Reg {
93         ComponentReg(final @NonNull DOMNotificationListener listener) {
94             super(listener);
95         }
96
97         @Override
98         protected void removeRegistration() {
99             // No-op
100         }
101     }
102
103     private final class PublishFacade implements DOMNotificationPublishService, DOMNotificationPublishDemandExtension {
104         @Override
105         public List<Extension> supportedExtensions() {
106             return List.of(this);
107         }
108
109         @Override
110         public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
111                 throws InterruptedException {
112             return putNotificationImpl(notification);
113         }
114
115         @Override
116         public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
117             final var subscribers = listeners.get(notification.getType());
118             return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
119         }
120
121         @Override
122         public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification,
123                 final long timeout, final TimeUnit unit) throws InterruptedException {
124             final var subscribers = listeners.get(notification.getType());
125             if (subscribers.isEmpty()) {
126                 return NO_LISTENERS;
127             }
128             // Attempt to perform a non-blocking publish first
129             final var noBlock = publish(notification, subscribers);
130             if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
131                 return noBlock;
132             }
133
134             try {
135                 final var publishThread = Thread.currentThread();
136                 final var timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
137                 final var withBlock = putNotificationImpl(notification);
138                 timerTask.cancel(true);
139                 if (observer.getQueue().size() > 50) {
140                     observer.purge();
141                 }
142                 return withBlock;
143             } catch (InterruptedException e) {
144                 return DOMNotificationPublishService.REJECTED;
145             }
146         }
147
148         @Override
149         public Registration registerDemandListener(final DemandListener listener) {
150             final var initialTypes = listeners.keySet();
151             executor.execute(() -> listener.onDemandUpdated(initialTypes));
152             return demandListeners.register(listener);
153         }
154     }
155
156     private final class SubscribeFacade implements DOMNotificationService {
157         @Override
158         public Registration registerNotificationListener(final DOMNotificationListener listener,
159                 final Collection<Absolute> types) {
160             synchronized (DOMNotificationRouter.this) {
161                 final var reg = new SingleReg(listener);
162
163                 if (!types.isEmpty()) {
164                     final var b = ImmutableMultimap.<Absolute, Reg>builder();
165                     b.putAll(listeners);
166
167                     for (var t : types) {
168                         b.put(t, reg);
169                     }
170
171                     replaceListeners(b.build());
172                 }
173
174                 return reg;
175             }
176         }
177
178         @Override
179         public synchronized Registration registerNotificationListeners(
180                 final Map<Absolute, DOMNotificationListener> typeToListener) {
181             synchronized (DOMNotificationRouter.this) {
182                 final var b = ImmutableMultimap.<Absolute, Reg>builder();
183                 b.putAll(listeners);
184
185                 final var tmp = new HashMap<DOMNotificationListener, ComponentReg>();
186                 for (var e : typeToListener.entrySet()) {
187                     b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new));
188                 }
189                 replaceListeners(b.build());
190
191                 final var regs = List.copyOf(tmp.values());
192                 return new AbstractRegistration() {
193                     @Override
194                     protected void removeRegistration() {
195                         regs.forEach(ComponentReg::close);
196                         removeRegistrations(regs);
197                     }
198                 };
199             }
200         }
201     }
202
203     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
204     private static final @NonNull ListenableFuture<?> NO_LISTENERS = Futures.immediateFuture(Empty.value());
205
206     private final EqualityQueuedNotificationManager<Reg, DOMNotificationRouterEvent> queueNotificationManager;
207     private final @NonNull DOMNotificationPublishService notificationPublishService = new PublishFacade();
208     private final @NonNull DOMNotificationService notificationService = new SubscribeFacade();
209     private final ObjectRegistry<DemandListener> demandListeners =
210         ObjectRegistry.createConcurrent("notification demand listeners");
211     private final ScheduledThreadPoolExecutor observer;
212     private final ExecutorService executor;
213
214     private volatile ImmutableMultimap<Absolute, Reg> listeners = ImmutableMultimap.of();
215
216     @Inject
217     public DOMNotificationRouter(final int maxQueueCapacity) {
218         observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
219             .setDaemon(true)
220             .setNameFormat("DOMNotificationRouter-observer-%d")
221             .build());
222         executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
223             .setDaemon(true)
224             .setNameFormat("DOMNotificationRouter-listeners-%d")
225             .build());
226         queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
227                 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
228         LOG.info("DOM Notification Router started");
229     }
230
231     @Activate
232     public DOMNotificationRouter(final Config config) {
233         this(config.queueDepth());
234     }
235
236     public @NonNull DOMNotificationService notificationService() {
237         return notificationService;
238     }
239
240     public @NonNull DOMNotificationPublishService notificationPublishService() {
241         return notificationPublishService;
242     }
243
244     private synchronized void removeRegistration(final SingleReg reg) {
245         replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> input != reg)));
246     }
247
248     private synchronized void removeRegistrations(final List<ComponentReg> regs) {
249         replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> !regs.contains(input))));
250     }
251
252     /**
253      * Swaps registered listeners and triggers notification update.
254      *
255      * @param newListeners is used to notify listenerTypes changed
256      */
257     private void replaceListeners(final ImmutableMultimap<Absolute, Reg> newListeners) {
258         listeners = newListeners;
259         notifyListenerTypesChanged(newListeners.keySet());
260     }
261
262     @SuppressWarnings("checkstyle:IllegalCatch")
263     private void notifyListenerTypesChanged(final @NonNull ImmutableSet<Absolute> typesAfter) {
264         final var listenersAfter = demandListeners.streamObjects().collect(ImmutableList.toImmutableList());
265         executor.execute(() -> {
266             for (var listener : listenersAfter) {
267                 try {
268                     listener.onDemandUpdated(typesAfter);
269                 } catch (final Exception e) {
270                     LOG.warn("Uncaught exception during invoking listener {}", listener, e);
271                 }
272             }
273         });
274     }
275
276     @VisibleForTesting
277     @NonNull ListenableFuture<? extends Object> putNotificationImpl(final DOMNotification notification)
278             throws InterruptedException {
279         final var subscribers = listeners.get(notification.getType());
280         return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
281     }
282
283     @VisibleForTesting
284     @NonNull ListenableFuture<?> publish(final DOMNotification notification, final Collection<Reg> subscribers) {
285         final var futures = new ArrayList<ListenableFuture<?>>(subscribers.size());
286         subscribers.forEach(subscriber -> {
287             final var event = new DOMNotificationRouterEvent(notification);
288             futures.add(event.future());
289             queueNotificationManager.submitNotification(subscriber, event);
290         });
291         return Futures.transform(Futures.successfulAsList(futures), ignored -> Empty.value(),
292             MoreExecutors.directExecutor());
293     }
294
295     @PreDestroy
296     @Deactivate
297     @Override
298     public void close() {
299         observer.shutdown();
300         executor.shutdown();
301         LOG.info("DOM Notification Router stopped");
302     }
303
304     @VisibleForTesting
305     ExecutorService executor() {
306         return executor;
307     }
308
309     @VisibleForTesting
310     ExecutorService observer() {
311         return observer;
312     }
313
314     @VisibleForTesting
315     ImmutableMultimap<Absolute, ?> listeners() {
316         return listeners;
317     }
318
319     @VisibleForTesting
320     ObjectRegistry<DemandListener> demandListeners() {
321         return demandListeners;
322     }
323
324     private static void deliverEvents(final Reg reg, final ImmutableList<DOMNotificationRouterEvent> events) {
325         if (reg.notClosed()) {
326             final var listener = reg.getInstance();
327             for (var event : events) {
328                 event.deliverTo(listener);
329             }
330         } else {
331             events.forEach(DOMNotificationRouterEvent::clear);
332         }
333     }
334 }