Reduce ObjectRegistration use
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.ImmutableMultimap;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.collect.Multimaps;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import com.google.common.util.concurrent.ThreadFactoryBuilder;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.opendaylight.mdsal.dom.api.DOMNotification;
35 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
36 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension;
37 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension.DemandListener;
38 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
39 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
40 import org.opendaylight.yangtools.concepts.AbstractRegistration;
41 import org.opendaylight.yangtools.concepts.Registration;
42 import org.opendaylight.yangtools.util.ObjectRegistry;
43 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
44 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
45 import org.opendaylight.yangtools.yang.common.Empty;
46 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
47 import org.osgi.service.component.annotations.Activate;
48 import org.osgi.service.component.annotations.Component;
49 import org.osgi.service.component.annotations.Deactivate;
50 import org.osgi.service.metatype.annotations.AttributeDefinition;
51 import org.osgi.service.metatype.annotations.Designate;
52 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
58  * routing of notifications from publishers to subscribers.
59  *
60  *<p>
61  * Internal implementation one by using a {@link QueuedNotificationManager}.
62  *</p>
63  */
64 @Singleton
65 @Component(configurationPid = "org.opendaylight.mdsal.dom.notification", service = DOMNotificationRouter.class)
66 @Designate(ocd = DOMNotificationRouter.Config.class)
67 // Non-final for testing
68 public class DOMNotificationRouter implements AutoCloseable {
69     @ObjectClassDefinition()
70     public @interface Config {
71         @AttributeDefinition(name = "notification-queue-depth")
72         int queueDepth() default 65536;
73     }
74
75     @VisibleForTesting
76     abstract static sealed class Reg extends AbstractRegistration {
77         private final @NonNull DOMNotificationListener listener;
78
79         Reg(final @NonNull DOMNotificationListener listener) {
80             this.listener = requireNonNull(listener);
81         }
82     }
83
84     private final class SingleReg extends Reg {
85         SingleReg(final @NonNull DOMNotificationListener listener) {
86             super(listener);
87         }
88
89         @Override
90         protected void removeRegistration() {
91             DOMNotificationRouter.this.removeRegistration(this);
92         }
93     }
94
95     private static final class ComponentReg extends Reg {
96         ComponentReg(final @NonNull DOMNotificationListener listener) {
97             super(listener);
98         }
99
100         @Override
101         protected void removeRegistration() {
102             // No-op
103         }
104     }
105
106     private final class PublishFacade implements DOMNotificationPublishService, DOMNotificationPublishDemandExtension {
107         @Override
108         public List<Extension> supportedExtensions() {
109             return List.of(this);
110         }
111
112         @Override
113         public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
114                 throws InterruptedException {
115             return putNotificationImpl(notification);
116         }
117
118         @Override
119         public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
120             final var subscribers = listeners.get(notification.getType());
121             return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
122         }
123
124         @Override
125         public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification,
126                 final long timeout, final TimeUnit unit) throws InterruptedException {
127             final var subscribers = listeners.get(notification.getType());
128             if (subscribers.isEmpty()) {
129                 return NO_LISTENERS;
130             }
131             // Attempt to perform a non-blocking publish first
132             final var noBlock = publish(notification, subscribers);
133             if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
134                 return noBlock;
135             }
136
137             try {
138                 final var publishThread = Thread.currentThread();
139                 final var timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
140                 final var withBlock = putNotificationImpl(notification);
141                 timerTask.cancel(true);
142                 if (observer.getQueue().size() > 50) {
143                     observer.purge();
144                 }
145                 return withBlock;
146             } catch (InterruptedException e) {
147                 return DOMNotificationPublishService.REJECTED;
148             }
149         }
150
151         @Override
152         public Registration registerDemandListener(final DemandListener listener) {
153             final var initialTypes = listeners.keySet();
154             executor.execute(() -> listener.onDemandUpdated(initialTypes));
155             return demandListeners.register(listener);
156         }
157     }
158
159     private final class SubscribeFacade implements DOMNotificationService {
160         @Override
161         public Registration registerNotificationListener(final DOMNotificationListener listener,
162                 final Collection<Absolute> types) {
163             synchronized (DOMNotificationRouter.this) {
164                 final var reg = new SingleReg(listener);
165
166                 if (!types.isEmpty()) {
167                     final var b = ImmutableMultimap.<Absolute, Reg>builder();
168                     b.putAll(listeners);
169
170                     for (var t : types) {
171                         b.put(t, reg);
172                     }
173
174                     replaceListeners(b.build());
175                 }
176
177                 return reg;
178             }
179         }
180
181         @Override
182         public synchronized Registration registerNotificationListeners(
183                 final Map<Absolute, DOMNotificationListener> typeToListener) {
184             synchronized (DOMNotificationRouter.this) {
185                 final var b = ImmutableMultimap.<Absolute, Reg>builder();
186                 b.putAll(listeners);
187
188                 final var tmp = new HashMap<DOMNotificationListener, ComponentReg>();
189                 for (var e : typeToListener.entrySet()) {
190                     b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new));
191                 }
192                 replaceListeners(b.build());
193
194                 final var regs = List.copyOf(tmp.values());
195                 return new AbstractRegistration() {
196                     @Override
197                     protected void removeRegistration() {
198                         regs.forEach(ComponentReg::close);
199                         removeRegistrations(regs);
200                     }
201                 };
202             }
203         }
204     }
205
206     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
207     private static final @NonNull ListenableFuture<?> NO_LISTENERS = Futures.immediateFuture(Empty.value());
208
209     private final EqualityQueuedNotificationManager<Reg, DOMNotificationRouterEvent> queueNotificationManager;
210     private final @NonNull DOMNotificationPublishService notificationPublishService = new PublishFacade();
211     private final @NonNull DOMNotificationService notificationService = new SubscribeFacade();
212     private final ObjectRegistry<DemandListener> demandListeners =
213         ObjectRegistry.createConcurrent("notification demand listeners");
214     private final ScheduledThreadPoolExecutor observer;
215     private final ExecutorService executor;
216
217     private volatile ImmutableMultimap<Absolute, Reg> listeners = ImmutableMultimap.of();
218
219     @Inject
220     public DOMNotificationRouter(final int maxQueueCapacity) {
221         observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
222             .setDaemon(true)
223             .setNameFormat("DOMNotificationRouter-observer-%d")
224             .build());
225         executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
226             .setDaemon(true)
227             .setNameFormat("DOMNotificationRouter-listeners-%d")
228             .build());
229         queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
230                 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
231         LOG.info("DOM Notification Router started");
232     }
233
234     @Activate
235     public DOMNotificationRouter(final Config config) {
236         this(config.queueDepth());
237     }
238
239     public @NonNull DOMNotificationService notificationService() {
240         return notificationService;
241     }
242
243     public @NonNull DOMNotificationPublishService notificationPublishService() {
244         return notificationPublishService;
245     }
246
247     private synchronized void removeRegistration(final SingleReg reg) {
248         replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> input != reg)));
249     }
250
251     private synchronized void removeRegistrations(final List<ComponentReg> regs) {
252         replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> !regs.contains(input))));
253     }
254
255     /**
256      * Swaps registered listeners and triggers notification update.
257      *
258      * @param newListeners is used to notify listenerTypes changed
259      */
260     private void replaceListeners(final ImmutableMultimap<Absolute, Reg> newListeners) {
261         listeners = newListeners;
262         notifyListenerTypesChanged(newListeners.keySet());
263     }
264
265     @SuppressWarnings("checkstyle:IllegalCatch")
266     private void notifyListenerTypesChanged(final @NonNull ImmutableSet<Absolute> typesAfter) {
267         final var listenersAfter = demandListeners.streamObjects().collect(ImmutableList.toImmutableList());
268         executor.execute(() -> {
269             for (var listener : listenersAfter) {
270                 try {
271                     listener.onDemandUpdated(typesAfter);
272                 } catch (final Exception e) {
273                     LOG.warn("Uncaught exception during invoking listener {}", listener, e);
274                 }
275             }
276         });
277     }
278
279     @VisibleForTesting
280     @NonNull ListenableFuture<? extends Object> putNotificationImpl(final DOMNotification notification)
281             throws InterruptedException {
282         final var subscribers = listeners.get(notification.getType());
283         return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
284     }
285
286     @VisibleForTesting
287     @NonNull ListenableFuture<?> publish(final DOMNotification notification, final Collection<Reg> subscribers) {
288         final var futures = new ArrayList<ListenableFuture<?>>(subscribers.size());
289         subscribers.forEach(subscriber -> {
290             final var event = new DOMNotificationRouterEvent(notification);
291             futures.add(event.future());
292             queueNotificationManager.submitNotification(subscriber, event);
293         });
294         return Futures.transform(Futures.successfulAsList(futures), ignored -> Empty.value(),
295             MoreExecutors.directExecutor());
296     }
297
298     @PreDestroy
299     @Deactivate
300     @Override
301     public void close() {
302         observer.shutdown();
303         executor.shutdown();
304         LOG.info("DOM Notification Router stopped");
305     }
306
307     @VisibleForTesting
308     ExecutorService executor() {
309         return executor;
310     }
311
312     @VisibleForTesting
313     ExecutorService observer() {
314         return observer;
315     }
316
317     @VisibleForTesting
318     ImmutableMultimap<Absolute, ?> listeners() {
319         return listeners;
320     }
321
322     @VisibleForTesting
323     ObjectRegistry<DemandListener> demandListeners() {
324         return demandListeners;
325     }
326
327     private static void deliverEvents(final Reg reg, final ImmutableList<DOMNotificationRouterEvent> events) {
328         if (reg.notClosed()) {
329             final var listener = reg.listener;
330             for (var event : events) {
331                 event.deliverTo(listener);
332             }
333         } else {
334             events.forEach(DOMNotificationRouterEvent::clear);
335         }
336     }
337 }