Do not use ListenerRegistration in DOMSchemaService
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.ImmutableList;
12 import com.google.common.collect.ImmutableMultimap;
13 import com.google.common.collect.ImmutableMultimap.Builder;
14 import com.google.common.collect.Multimap;
15 import com.google.common.collect.Multimaps;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.ThreadFactoryBuilder;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.Set;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledFuture;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import org.opendaylight.mdsal.dom.api.DOMNotification;
32 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
33 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
35 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
36 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
37 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.util.ListenerRegistry;
40 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
41 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
42 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
43 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
44 import org.osgi.service.component.annotations.Activate;
45 import org.osgi.service.component.annotations.Component;
46 import org.osgi.service.component.annotations.Deactivate;
47 import org.osgi.service.metatype.annotations.AttributeDefinition;
48 import org.osgi.service.metatype.annotations.Designate;
49 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 /**
54  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
55  * routing of notifications from publishers to subscribers.
56  *
57  *<p>
58  * Internal implementation one by using a {@link QueuedNotificationManager}.
59  *</p>
60  */
61 @Component(immediate = true, configurationPid = "org.opendaylight.mdsal.dom.notification", service = {
62     DOMNotificationService.class, DOMNotificationPublishService.class,
63     DOMNotificationSubscriptionListenerRegistry.class
64 })
65 @Designate(ocd = DOMNotificationRouter.Config.class)
66 // Non-final for testing
67 public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
68         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
69     @ObjectClassDefinition()
70     public @interface Config {
71         @AttributeDefinition(name = "notification-queue-depth")
72         int queueDepth() default 65536;
73     }
74
75     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
76     private static final ListenableFuture<Void> NO_LISTENERS = FluentFutures.immediateNullFluentFuture();
77
78     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
79             ListenerRegistry.create();
80     private final EqualityQueuedNotificationManager<AbstractListenerRegistration<? extends DOMNotificationListener>,
81                 DOMNotificationRouterEvent> queueNotificationManager;
82     private final ScheduledThreadPoolExecutor observer;
83     private final ExecutorService executor;
84
85     private volatile Multimap<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> listeners =
86             ImmutableMultimap.of();
87
88     @Inject
89     public DOMNotificationRouter(final int maxQueueCapacity) {
90         observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
91             .setDaemon(true)
92             .setNameFormat("DOMNotificationRouter-observer-%d")
93             .build());
94         executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
95             .setDaemon(true)
96             .setNameFormat("DOMNotificationRouter-listeners-%d")
97             .build());
98         queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
99                 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
100     }
101
102     @Activate
103     public DOMNotificationRouter(final Config config) {
104         this(config.queueDepth());
105         LOG.info("DOM Notification Router started");
106     }
107
108     @Deprecated(forRemoval = true)
109     public static DOMNotificationRouter create(final int maxQueueCapacity) {
110         return new DOMNotificationRouter(maxQueueCapacity);
111     }
112
113     @Override
114     public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
115             final T listener, final Collection<Absolute> types) {
116         final AbstractListenerRegistration<T> reg = new AbstractListenerRegistration<>(listener) {
117             @Override
118             protected void removeRegistration() {
119                 synchronized (DOMNotificationRouter.this) {
120                     replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners,
121                         input -> input != this)));
122                 }
123             }
124         };
125
126         if (!types.isEmpty()) {
127             final Builder<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> b =
128                     ImmutableMultimap.builder();
129             b.putAll(listeners);
130
131             for (final Absolute t : types) {
132                 b.put(t, reg);
133             }
134
135             replaceListeners(b.build());
136         }
137
138         return reg;
139     }
140
141     /**
142      * Swaps registered listeners and triggers notification update.
143      *
144      * @param newListeners is used to notify listenerTypes changed
145      */
146     private void replaceListeners(
147             final Multimap<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> newListeners) {
148         listeners = newListeners;
149         notifyListenerTypesChanged(newListeners.keySet());
150     }
151
152     @SuppressWarnings("checkstyle:IllegalCatch")
153     private void notifyListenerTypesChanged(final Set<Absolute> typesAfter) {
154         final List<? extends DOMNotificationSubscriptionListener> listenersAfter =
155                 subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
156         executor.execute(() -> {
157             for (final DOMNotificationSubscriptionListener subListener : listenersAfter) {
158                 try {
159                     subListener.onSubscriptionChanged(typesAfter);
160                 } catch (final Exception e) {
161                     LOG.warn("Uncaught exception during invoking listener {}", subListener, e);
162                 }
163             }
164         });
165     }
166
167     @Override
168     public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
169             final L listener) {
170         final Set<Absolute> initialTypes = listeners.keySet();
171         executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
172         return subscriptionListeners.register(listener);
173     }
174
175
176     @VisibleForTesting
177     ListenableFuture<? extends Object> publish(final DOMNotification notification,
178             final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
179         final List<ListenableFuture<Void>> futures = new ArrayList<>(subscribers.size());
180         subscribers.forEach(subscriber -> {
181             final DOMNotificationRouterEvent event = new DOMNotificationRouterEvent(notification);
182             futures.add(event.future());
183             queueNotificationManager.submitNotification(subscriber, event);
184         });
185         return Futures.transform(Futures.successfulAsList(futures), ignored -> (Void)null,
186             MoreExecutors.directExecutor());
187     }
188
189     @Override
190     public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
191             throws InterruptedException {
192         final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
193                 listeners.get(notification.getType());
194         if (subscribers.isEmpty()) {
195             return NO_LISTENERS;
196         }
197
198         return publish(notification, subscribers);
199     }
200
201     @Override
202     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
203         final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
204                 listeners.get(notification.getType());
205         if (subscribers.isEmpty()) {
206             return NO_LISTENERS;
207         }
208
209         return publish(notification, subscribers);
210     }
211
212     @Override
213     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
214             final TimeUnit unit) throws InterruptedException {
215         final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
216                 listeners.get(notification.getType());
217         if (subscribers.isEmpty()) {
218             return NO_LISTENERS;
219         }
220         // Attempt to perform a non-blocking publish first
221         final ListenableFuture<?> noBlock = publish(notification, subscribers);
222         if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
223             return noBlock;
224         }
225
226         try {
227             final Thread publishThread = Thread.currentThread();
228             ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
229             final ListenableFuture<?> withBlock = putNotification(notification);
230             timerTask.cancel(true);
231             if (observer.getQueue().size() > 50) {
232                 observer.purge();
233             }
234             return withBlock;
235         } catch (InterruptedException e) {
236             return DOMNotificationPublishService.REJECTED;
237         }
238     }
239
240     @PreDestroy
241     @Deactivate
242     @Override
243     public void close() {
244         observer.shutdown();
245         executor.shutdown();
246         LOG.info("DOM Notification Router stopped");
247     }
248
249     @VisibleForTesting
250     ExecutorService executor() {
251         return executor;
252     }
253
254     @VisibleForTesting
255     ExecutorService observer() {
256         return observer;
257     }
258
259     @VisibleForTesting
260     Multimap<Absolute, ?> listeners() {
261         return listeners;
262     }
263
264     @VisibleForTesting
265     ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners() {
266         return subscriptionListeners;
267     }
268
269     private static void deliverEvents(final AbstractListenerRegistration<? extends DOMNotificationListener> reg,
270             final ImmutableList<DOMNotificationRouterEvent> events) {
271         if (reg.notClosed()) {
272             final DOMNotificationListener listener = reg.getInstance();
273             for (DOMNotificationRouterEvent event : events) {
274                 event.deliverTo(listener);
275             }
276         } else {
277             events.forEach(DOMNotificationRouterEvent::clear);
278         }
279     }
280 }