35db4b41f59a97f5e302abfcff184fead356549a
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.ImmutableList;
12 import com.google.common.collect.ImmutableMultimap;
13 import com.google.common.collect.Multimap;
14 import com.google.common.collect.Multimaps;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Set;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.PreDestroy;
29 import javax.inject.Inject;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.opendaylight.mdsal.dom.api.DOMNotification;
32 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
33 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
35 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
36 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
37 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.util.ListenerRegistry;
40 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
41 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
42 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
43 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
44 import org.osgi.service.component.annotations.Activate;
45 import org.osgi.service.component.annotations.Component;
46 import org.osgi.service.component.annotations.Deactivate;
47 import org.osgi.service.metatype.annotations.AttributeDefinition;
48 import org.osgi.service.metatype.annotations.Designate;
49 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
55  * routing of notifications from publishers to subscribers.
56  *
57  *<p>
58  * Internal implementation one by using a {@link QueuedNotificationManager}.
59  *</p>
60  */
61 @Component(immediate = true, configurationPid = "org.opendaylight.mdsal.dom.notification", service = {
62     DOMNotificationService.class, DOMNotificationPublishService.class,
63     DOMNotificationSubscriptionListenerRegistry.class
64 })
65 @Designate(ocd = DOMNotificationRouter.Config.class)
66 // Non-final for testing
67 public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
68         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
69     @ObjectClassDefinition()
70     public @interface Config {
71         @AttributeDefinition(name = "notification-queue-depth")
72         int queueDepth() default 65536;
73     }
74
75     @VisibleForTesting
76     final class Reg<T extends DOMNotificationListener> extends AbstractListenerRegistration<T> {
77         private Reg(final @NonNull T listener) {
78             super(listener);
79         }
80
81         @Override
82         protected void removeRegistration() {
83             DOMNotificationRouter.this.removeRegistration(this);
84         }
85     }
86
87     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
88     private static final ListenableFuture<Void> NO_LISTENERS = FluentFutures.immediateNullFluentFuture();
89
90     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
91             ListenerRegistry.create();
92     private final EqualityQueuedNotificationManager<AbstractListenerRegistration<? extends DOMNotificationListener>,
93                 DOMNotificationRouterEvent> queueNotificationManager;
94     private final ScheduledThreadPoolExecutor observer;
95     private final ExecutorService executor;
96
97     private volatile Multimap<Absolute, Reg<?>> listeners = ImmutableMultimap.of();
98
99     @Inject
100     public DOMNotificationRouter(final int maxQueueCapacity) {
101         observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
102             .setDaemon(true)
103             .setNameFormat("DOMNotificationRouter-observer-%d")
104             .build());
105         executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
106             .setDaemon(true)
107             .setNameFormat("DOMNotificationRouter-listeners-%d")
108             .build());
109         queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
110                 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
111     }
112
113     @Activate
114     public DOMNotificationRouter(final Config config) {
115         this(config.queueDepth());
116         LOG.info("DOM Notification Router started");
117     }
118
119     @Deprecated(forRemoval = true)
120     public static DOMNotificationRouter create(final int maxQueueCapacity) {
121         return new DOMNotificationRouter(maxQueueCapacity);
122     }
123
124     @Override
125     public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
126             final T listener, final Collection<Absolute> types) {
127         final var reg = new Reg<>(listener);
128
129         if (!types.isEmpty()) {
130             final var b = ImmutableMultimap.<Absolute, Reg<?>>builder();
131             b.putAll(listeners);
132
133             for (final Absolute t : types) {
134                 b.put(t, reg);
135             }
136
137             replaceListeners(b.build());
138         }
139
140         return reg;
141     }
142
143     private synchronized void removeRegistration(final Reg<?> reg) {
144         replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> input != reg)));
145     }
146
147     /**
148      * Swaps registered listeners and triggers notification update.
149      *
150      * @param newListeners is used to notify listenerTypes changed
151      */
152     private void replaceListeners(final Multimap<Absolute, Reg<?>> newListeners) {
153         listeners = newListeners;
154         notifyListenerTypesChanged(newListeners.keySet());
155     }
156
157     @SuppressWarnings("checkstyle:IllegalCatch")
158     private void notifyListenerTypesChanged(final Set<Absolute> typesAfter) {
159         final List<? extends DOMNotificationSubscriptionListener> listenersAfter =
160                 subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
161         executor.execute(() -> {
162             for (final DOMNotificationSubscriptionListener subListener : listenersAfter) {
163                 try {
164                     subListener.onSubscriptionChanged(typesAfter);
165                 } catch (final Exception e) {
166                     LOG.warn("Uncaught exception during invoking listener {}", subListener, e);
167                 }
168             }
169         });
170     }
171
172     @Override
173     public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
174             final L listener) {
175         final Set<Absolute> initialTypes = listeners.keySet();
176         executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
177         return subscriptionListeners.register(listener);
178     }
179
180     @VisibleForTesting
181     ListenableFuture<? extends Object> publish(final DOMNotification notification,
182             final Collection<Reg<?>> subscribers) {
183         final List<ListenableFuture<Void>> futures = new ArrayList<>(subscribers.size());
184         subscribers.forEach(subscriber -> {
185             final DOMNotificationRouterEvent event = new DOMNotificationRouterEvent(notification);
186             futures.add(event.future());
187             queueNotificationManager.submitNotification(subscriber, event);
188         });
189         return Futures.transform(Futures.successfulAsList(futures), ignored -> (Void)null,
190             MoreExecutors.directExecutor());
191     }
192
193     @Override
194     public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
195             throws InterruptedException {
196         final var subscribers = listeners.get(notification.getType());
197         if (subscribers.isEmpty()) {
198             return NO_LISTENERS;
199         }
200
201         return publish(notification, subscribers);
202     }
203
204     @Override
205     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
206         final var subscribers = listeners.get(notification.getType());
207         if (subscribers.isEmpty()) {
208             return NO_LISTENERS;
209         }
210
211         return publish(notification, subscribers);
212     }
213
214     @Override
215     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
216             final TimeUnit unit) throws InterruptedException {
217         final var subscribers = listeners.get(notification.getType());
218         if (subscribers.isEmpty()) {
219             return NO_LISTENERS;
220         }
221         // Attempt to perform a non-blocking publish first
222         final ListenableFuture<?> noBlock = publish(notification, subscribers);
223         if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
224             return noBlock;
225         }
226
227         try {
228             final Thread publishThread = Thread.currentThread();
229             ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
230             final ListenableFuture<?> withBlock = putNotification(notification);
231             timerTask.cancel(true);
232             if (observer.getQueue().size() > 50) {
233                 observer.purge();
234             }
235             return withBlock;
236         } catch (InterruptedException e) {
237             return DOMNotificationPublishService.REJECTED;
238         }
239     }
240
241     @PreDestroy
242     @Deactivate
243     @Override
244     public void close() {
245         observer.shutdown();
246         executor.shutdown();
247         LOG.info("DOM Notification Router stopped");
248     }
249
250     @VisibleForTesting
251     ExecutorService executor() {
252         return executor;
253     }
254
255     @VisibleForTesting
256     ExecutorService observer() {
257         return observer;
258     }
259
260     @VisibleForTesting
261     Multimap<Absolute, ?> listeners() {
262         return listeners;
263     }
264
265     @VisibleForTesting
266     ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners() {
267         return subscriptionListeners;
268     }
269
270     private static void deliverEvents(final AbstractListenerRegistration<? extends DOMNotificationListener> reg,
271             final ImmutableList<DOMNotificationRouterEvent> events) {
272         if (reg.notClosed()) {
273             final DOMNotificationListener listener = reg.getInstance();
274             for (DOMNotificationRouterEvent event : events) {
275                 event.deliverTo(listener);
276             }
277         } else {
278             events.forEach(DOMNotificationRouterEvent::clear);
279         }
280     }
281 }