2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.broker;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.ImmutableList;
12 import com.google.common.collect.ImmutableMultimap;
13 import com.google.common.collect.Multimap;
14 import com.google.common.collect.Multimaps;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.List;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.opendaylight.mdsal.dom.api.DOMNotification;
33 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
35 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
36 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
37 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
38 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
39 import org.opendaylight.yangtools.concepts.AbstractRegistration;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.concepts.Registration;
42 import org.opendaylight.yangtools.util.ListenerRegistry;
43 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
44 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
45 import org.opendaylight.yangtools.yang.common.Empty;
46 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
47 import org.osgi.service.component.annotations.Activate;
48 import org.osgi.service.component.annotations.Component;
49 import org.osgi.service.component.annotations.Deactivate;
50 import org.osgi.service.metatype.annotations.AttributeDefinition;
51 import org.osgi.service.metatype.annotations.Designate;
52 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
58 * routing of notifications from publishers to subscribers.
61 * Internal implementation one by using a {@link QueuedNotificationManager}.
64 @Component(immediate = true, configurationPid = "org.opendaylight.mdsal.dom.notification", service = {
65 DOMNotificationService.class, DOMNotificationPublishService.class,
66 DOMNotificationSubscriptionListenerRegistry.class
68 @Designate(ocd = DOMNotificationRouter.Config.class)
69 // Non-final for testing
70 public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
71 DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
72 @ObjectClassDefinition()
73 public @interface Config {
74 @AttributeDefinition(name = "notification-queue-depth")
75 int queueDepth() default 65536;
79 abstract static sealed class Reg<T extends DOMNotificationListener> extends AbstractListenerRegistration<T> {
80 Reg(final @NonNull T listener) {
85 private final class SingleReg<T extends DOMNotificationListener> extends Reg<T> {
86 SingleReg(final @NonNull T listener) {
91 protected void removeRegistration() {
92 DOMNotificationRouter.this.removeRegistration(this);
96 private static final class ComponentReg extends Reg<DOMNotificationListener> {
97 ComponentReg(final @NonNull DOMNotificationListener listener) {
102 protected void removeRegistration() {
107 private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
108 private static final @NonNull ListenableFuture<?> NO_LISTENERS = Futures.immediateFuture(Empty.value());
110 private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
111 ListenerRegistry.create();
112 private final EqualityQueuedNotificationManager<AbstractListenerRegistration<? extends DOMNotificationListener>,
113 DOMNotificationRouterEvent> queueNotificationManager;
114 private final ScheduledThreadPoolExecutor observer;
115 private final ExecutorService executor;
117 private volatile Multimap<Absolute, Reg<?>> listeners = ImmutableMultimap.of();
120 public DOMNotificationRouter(final int maxQueueCapacity) {
121 observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
123 .setNameFormat("DOMNotificationRouter-observer-%d")
125 executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
127 .setNameFormat("DOMNotificationRouter-listeners-%d")
129 queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
130 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
131 LOG.info("DOM Notification Router started");
135 public DOMNotificationRouter(final Config config) {
136 this(config.queueDepth());
140 public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
141 final T listener, final Collection<Absolute> types) {
142 final var reg = new SingleReg<>(listener);
144 if (!types.isEmpty()) {
145 final var b = ImmutableMultimap.<Absolute, Reg<?>>builder();
148 for (var t : types) {
152 replaceListeners(b.build());
159 public synchronized Registration registerNotificationListeners(
160 final Map<Absolute, DOMNotificationListener> typeToListener) {
161 final var b = ImmutableMultimap.<Absolute, Reg<?>>builder();
164 final var tmp = new HashMap<DOMNotificationListener, ComponentReg>();
165 for (var e : typeToListener.entrySet()) {
166 b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new));
168 replaceListeners(b.build());
170 final var regs = List.copyOf(tmp.values());
171 return new AbstractRegistration() {
173 protected void removeRegistration() {
174 regs.forEach(ComponentReg::close);
175 removeRegistrations(regs);
180 private synchronized void removeRegistration(final SingleReg<?> reg) {
181 replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> input != reg)));
184 private synchronized void removeRegistrations(final List<ComponentReg> regs) {
185 replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> !regs.contains(input))));
189 * Swaps registered listeners and triggers notification update.
191 * @param newListeners is used to notify listenerTypes changed
193 private void replaceListeners(final Multimap<Absolute, Reg<?>> newListeners) {
194 listeners = newListeners;
195 notifyListenerTypesChanged(newListeners.keySet());
198 @SuppressWarnings("checkstyle:IllegalCatch")
199 private void notifyListenerTypesChanged(final Set<Absolute> typesAfter) {
200 final var listenersAfter = subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
201 executor.execute(() -> {
202 for (var subListener : listenersAfter) {
204 subListener.onSubscriptionChanged(typesAfter);
205 } catch (final Exception e) {
206 LOG.warn("Uncaught exception during invoking listener {}", subListener, e);
213 public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
215 final var initialTypes = listeners.keySet();
216 executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
217 return subscriptionListeners.register(listener);
221 @NonNull ListenableFuture<?> publish(final DOMNotification notification, final Collection<Reg<?>> subscribers) {
222 final var futures = new ArrayList<ListenableFuture<?>>(subscribers.size());
223 subscribers.forEach(subscriber -> {
224 final var event = new DOMNotificationRouterEvent(notification);
225 futures.add(event.future());
226 queueNotificationManager.submitNotification(subscriber, event);
228 return Futures.transform(Futures.successfulAsList(futures), ignored -> Empty.value(),
229 MoreExecutors.directExecutor());
233 public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
234 throws InterruptedException {
235 final var subscribers = listeners.get(notification.getType());
236 return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
240 public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
241 final var subscribers = listeners.get(notification.getType());
242 return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
246 public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
247 final TimeUnit unit) throws InterruptedException {
248 final var subscribers = listeners.get(notification.getType());
249 if (subscribers.isEmpty()) {
252 // Attempt to perform a non-blocking publish first
253 final var noBlock = publish(notification, subscribers);
254 if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
259 final var publishThread = Thread.currentThread();
260 final var timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
261 final var withBlock = putNotification(notification);
262 timerTask.cancel(true);
263 if (observer.getQueue().size() > 50) {
267 } catch (InterruptedException e) {
268 return DOMNotificationPublishService.REJECTED;
275 public void close() {
278 LOG.info("DOM Notification Router stopped");
282 ExecutorService executor() {
287 ExecutorService observer() {
292 Multimap<Absolute, ?> listeners() {
297 ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners() {
298 return subscriptionListeners;
301 private static void deliverEvents(final AbstractListenerRegistration<? extends DOMNotificationListener> reg,
302 final ImmutableList<DOMNotificationRouterEvent> events) {
303 if (reg.notClosed()) {
304 final var listener = reg.getInstance();
305 for (var event : events) {
306 event.deliverTo(listener);
309 events.forEach(DOMNotificationRouterEvent::clear);