2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.broker;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.ImmutableMultimap;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.collect.Multimaps;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.MoreExecutors;
20 import com.google.common.util.concurrent.ThreadFactoryBuilder;
21 import java.util.ArrayList;
22 import java.util.Collection;
23 import java.util.HashMap;
24 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import javax.annotation.PreDestroy;
31 import javax.inject.Inject;
32 import javax.inject.Singleton;
33 import org.eclipse.jdt.annotation.NonNull;
34 import org.opendaylight.mdsal.dom.api.DOMNotification;
35 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
36 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension;
37 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension.DemandListener;
38 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
39 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
40 import org.opendaylight.yangtools.concepts.AbstractRegistration;
41 import org.opendaylight.yangtools.concepts.Registration;
42 import org.opendaylight.yangtools.util.ObjectRegistry;
43 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
44 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
45 import org.opendaylight.yangtools.yang.common.Empty;
46 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
47 import org.osgi.service.component.annotations.Activate;
48 import org.osgi.service.component.annotations.Component;
49 import org.osgi.service.component.annotations.Deactivate;
50 import org.osgi.service.metatype.annotations.AttributeDefinition;
51 import org.osgi.service.metatype.annotations.Designate;
52 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
58 * routing of notifications from publishers to subscribers.
61 * Internal implementation one by using a {@link QueuedNotificationManager}.
65 @Component(configurationPid = "org.opendaylight.mdsal.dom.notification", service = DOMNotificationRouter.class)
66 @Designate(ocd = DOMNotificationRouter.Config.class)
67 // Non-final for testing
68 public class DOMNotificationRouter implements AutoCloseable {
69 @ObjectClassDefinition()
70 public @interface Config {
71 @AttributeDefinition(name = "notification-queue-depth")
72 int queueDepth() default 65536;
76 abstract static sealed class Reg extends AbstractRegistration {
77 private final @NonNull DOMNotificationListener listener;
79 Reg(final @NonNull DOMNotificationListener listener) {
80 this.listener = requireNonNull(listener);
84 private final class SingleReg extends Reg {
85 SingleReg(final @NonNull DOMNotificationListener listener) {
90 protected void removeRegistration() {
91 DOMNotificationRouter.this.removeRegistration(this);
95 private static final class ComponentReg extends Reg {
96 ComponentReg(final @NonNull DOMNotificationListener listener) {
101 protected void removeRegistration() {
106 private final class PublishFacade implements DOMNotificationPublishService, DOMNotificationPublishDemandExtension {
108 public List<Extension> supportedExtensions() {
109 return List.of(this);
113 public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
114 throws InterruptedException {
115 return putNotificationImpl(notification);
119 public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
120 final var subscribers = listeners.get(notification.getType());
121 return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
125 public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification,
126 final long timeout, final TimeUnit unit) throws InterruptedException {
127 final var subscribers = listeners.get(notification.getType());
128 if (subscribers.isEmpty()) {
131 // Attempt to perform a non-blocking publish first
132 final var noBlock = publish(notification, subscribers);
133 if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
138 final var publishThread = Thread.currentThread();
139 final var timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
140 final var withBlock = putNotificationImpl(notification);
141 timerTask.cancel(true);
142 if (observer.getQueue().size() > 50) {
146 } catch (InterruptedException e) {
147 return DOMNotificationPublishService.REJECTED;
152 public Registration registerDemandListener(final DemandListener listener) {
153 final var initialTypes = listeners.keySet();
154 executor.execute(() -> listener.onDemandUpdated(initialTypes));
155 return demandListeners.register(listener);
159 private final class SubscribeFacade implements DOMNotificationService {
161 public Registration registerNotificationListener(final DOMNotificationListener listener,
162 final Collection<Absolute> types) {
163 synchronized (DOMNotificationRouter.this) {
164 final var reg = new SingleReg(listener);
166 if (!types.isEmpty()) {
167 final var b = ImmutableMultimap.<Absolute, Reg>builder();
170 for (var t : types) {
174 replaceListeners(b.build());
182 public synchronized Registration registerNotificationListeners(
183 final Map<Absolute, DOMNotificationListener> typeToListener) {
184 synchronized (DOMNotificationRouter.this) {
185 final var b = ImmutableMultimap.<Absolute, Reg>builder();
188 final var tmp = new HashMap<DOMNotificationListener, ComponentReg>();
189 for (var e : typeToListener.entrySet()) {
190 b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new));
192 replaceListeners(b.build());
194 final var regs = List.copyOf(tmp.values());
195 return new AbstractRegistration() {
197 protected void removeRegistration() {
198 regs.forEach(ComponentReg::close);
199 removeRegistrations(regs);
206 private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
207 private static final @NonNull ListenableFuture<?> NO_LISTENERS = Futures.immediateFuture(Empty.value());
209 private final EqualityQueuedNotificationManager<Reg, DOMNotificationRouterEvent> queueNotificationManager;
210 private final @NonNull DOMNotificationPublishService notificationPublishService = new PublishFacade();
211 private final @NonNull DOMNotificationService notificationService = new SubscribeFacade();
212 private final ObjectRegistry<DemandListener> demandListeners =
213 ObjectRegistry.createConcurrent("notification demand listeners");
214 private final ScheduledThreadPoolExecutor observer;
215 private final ExecutorService executor;
217 private volatile ImmutableMultimap<Absolute, Reg> listeners = ImmutableMultimap.of();
220 public DOMNotificationRouter(final int maxQueueCapacity) {
221 observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
223 .setNameFormat("DOMNotificationRouter-observer-%d")
225 executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
227 .setNameFormat("DOMNotificationRouter-listeners-%d")
229 queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
230 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
231 LOG.info("DOM Notification Router started");
235 public DOMNotificationRouter(final Config config) {
236 this(config.queueDepth());
239 public @NonNull DOMNotificationService notificationService() {
240 return notificationService;
243 public @NonNull DOMNotificationPublishService notificationPublishService() {
244 return notificationPublishService;
247 private synchronized void removeRegistration(final SingleReg reg) {
248 replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> input != reg)));
251 private synchronized void removeRegistrations(final List<ComponentReg> regs) {
252 replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> !regs.contains(input))));
256 * Swaps registered listeners and triggers notification update.
258 * @param newListeners is used to notify listenerTypes changed
260 private void replaceListeners(final ImmutableMultimap<Absolute, Reg> newListeners) {
261 listeners = newListeners;
262 notifyListenerTypesChanged(newListeners.keySet());
265 @SuppressWarnings("checkstyle:IllegalCatch")
266 private void notifyListenerTypesChanged(final @NonNull ImmutableSet<Absolute> typesAfter) {
267 final var listenersAfter = demandListeners.streamObjects().collect(ImmutableList.toImmutableList());
268 executor.execute(() -> {
269 for (var listener : listenersAfter) {
271 listener.onDemandUpdated(typesAfter);
272 } catch (final Exception e) {
273 LOG.warn("Uncaught exception during invoking listener {}", listener, e);
280 @NonNull ListenableFuture<? extends Object> putNotificationImpl(final DOMNotification notification)
281 throws InterruptedException {
282 final var subscribers = listeners.get(notification.getType());
283 return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
287 @NonNull ListenableFuture<?> publish(final DOMNotification notification, final Collection<Reg> subscribers) {
288 final var futures = new ArrayList<ListenableFuture<?>>(subscribers.size());
289 subscribers.forEach(subscriber -> {
290 final var event = new DOMNotificationRouterEvent(notification);
291 futures.add(event.future());
292 queueNotificationManager.submitNotification(subscriber, event);
294 return Futures.transform(Futures.successfulAsList(futures), ignored -> Empty.value(),
295 MoreExecutors.directExecutor());
301 public void close() {
304 LOG.info("DOM Notification Router stopped");
308 ExecutorService executor() {
313 ExecutorService observer() {
318 ImmutableMultimap<Absolute, ?> listeners() {
323 ObjectRegistry<DemandListener> demandListeners() {
324 return demandListeners;
327 private static void deliverEvents(final Reg reg, final ImmutableList<DOMNotificationRouterEvent> events) {
328 if (reg.notClosed()) {
329 final var listener = reg.listener;
330 for (var event : events) {
331 event.deliverTo(listener);
334 events.forEach(DOMNotificationRouterEvent::clear);