2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.broker;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.ImmutableList;
12 import com.google.common.collect.ImmutableMultimap;
13 import com.google.common.collect.ImmutableSet;
14 import com.google.common.collect.Multimaps;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.ThreadFactoryBuilder;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.List;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28 import javax.annotation.PreDestroy;
29 import javax.inject.Inject;
30 import javax.inject.Singleton;
31 import org.eclipse.jdt.annotation.NonNull;
32 import org.opendaylight.mdsal.dom.api.DOMNotification;
33 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension;
35 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension.DemandListener;
36 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
37 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
38 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
39 import org.opendaylight.yangtools.concepts.AbstractRegistration;
40 import org.opendaylight.yangtools.concepts.Registration;
41 import org.opendaylight.yangtools.util.ObjectRegistry;
42 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
43 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
44 import org.opendaylight.yangtools.yang.common.Empty;
45 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
46 import org.osgi.service.component.annotations.Activate;
47 import org.osgi.service.component.annotations.Component;
48 import org.osgi.service.component.annotations.Deactivate;
49 import org.osgi.service.metatype.annotations.AttributeDefinition;
50 import org.osgi.service.metatype.annotations.Designate;
51 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
57 * routing of notifications from publishers to subscribers.
60 * Internal implementation one by using a {@link QueuedNotificationManager}.
64 @Component(configurationPid = "org.opendaylight.mdsal.dom.notification", service = DOMNotificationRouter.class)
65 @Designate(ocd = DOMNotificationRouter.Config.class)
66 // Non-final for testing
67 public class DOMNotificationRouter implements AutoCloseable {
68 @ObjectClassDefinition()
69 public @interface Config {
70 @AttributeDefinition(name = "notification-queue-depth")
71 int queueDepth() default 65536;
75 abstract static sealed class Reg extends AbstractObjectRegistration<DOMNotificationListener> {
76 Reg(final @NonNull DOMNotificationListener listener) {
81 private final class SingleReg extends Reg {
82 SingleReg(final @NonNull DOMNotificationListener listener) {
87 protected void removeRegistration() {
88 DOMNotificationRouter.this.removeRegistration(this);
92 private static final class ComponentReg extends Reg {
93 ComponentReg(final @NonNull DOMNotificationListener listener) {
98 protected void removeRegistration() {
103 private final class PublishFacade implements DOMNotificationPublishService, DOMNotificationPublishDemandExtension {
105 public List<Extension> supportedExtensions() {
106 return List.of(this);
110 public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
111 throws InterruptedException {
112 return putNotificationImpl(notification);
116 public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
117 final var subscribers = listeners.get(notification.getType());
118 return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
122 public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification,
123 final long timeout, final TimeUnit unit) throws InterruptedException {
124 final var subscribers = listeners.get(notification.getType());
125 if (subscribers.isEmpty()) {
128 // Attempt to perform a non-blocking publish first
129 final var noBlock = publish(notification, subscribers);
130 if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
135 final var publishThread = Thread.currentThread();
136 final var timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
137 final var withBlock = putNotificationImpl(notification);
138 timerTask.cancel(true);
139 if (observer.getQueue().size() > 50) {
143 } catch (InterruptedException e) {
144 return DOMNotificationPublishService.REJECTED;
149 public Registration registerDemandListener(final DemandListener listener) {
150 final var initialTypes = listeners.keySet();
151 executor.execute(() -> listener.onDemandUpdated(initialTypes));
152 return demandListeners.register(listener);
156 private final class SubscribeFacade implements DOMNotificationService {
158 public Registration registerNotificationListener(final DOMNotificationListener listener,
159 final Collection<Absolute> types) {
160 synchronized (DOMNotificationRouter.this) {
161 final var reg = new SingleReg(listener);
163 if (!types.isEmpty()) {
164 final var b = ImmutableMultimap.<Absolute, Reg>builder();
167 for (var t : types) {
171 replaceListeners(b.build());
179 public synchronized Registration registerNotificationListeners(
180 final Map<Absolute, DOMNotificationListener> typeToListener) {
181 synchronized (DOMNotificationRouter.this) {
182 final var b = ImmutableMultimap.<Absolute, Reg>builder();
185 final var tmp = new HashMap<DOMNotificationListener, ComponentReg>();
186 for (var e : typeToListener.entrySet()) {
187 b.put(e.getKey(), tmp.computeIfAbsent(e.getValue(), ComponentReg::new));
189 replaceListeners(b.build());
191 final var regs = List.copyOf(tmp.values());
192 return new AbstractRegistration() {
194 protected void removeRegistration() {
195 regs.forEach(ComponentReg::close);
196 removeRegistrations(regs);
203 private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
204 private static final @NonNull ListenableFuture<?> NO_LISTENERS = Futures.immediateFuture(Empty.value());
206 private final EqualityQueuedNotificationManager<Reg, DOMNotificationRouterEvent> queueNotificationManager;
207 private final @NonNull DOMNotificationPublishService notificationPublishService = new PublishFacade();
208 private final @NonNull DOMNotificationService notificationService = new SubscribeFacade();
209 private final ObjectRegistry<DemandListener> demandListeners =
210 ObjectRegistry.createConcurrent("notification demand listeners");
211 private final ScheduledThreadPoolExecutor observer;
212 private final ExecutorService executor;
214 private volatile ImmutableMultimap<Absolute, Reg> listeners = ImmutableMultimap.of();
217 public DOMNotificationRouter(final int maxQueueCapacity) {
218 observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
220 .setNameFormat("DOMNotificationRouter-observer-%d")
222 executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
224 .setNameFormat("DOMNotificationRouter-listeners-%d")
226 queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
227 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
228 LOG.info("DOM Notification Router started");
232 public DOMNotificationRouter(final Config config) {
233 this(config.queueDepth());
236 public @NonNull DOMNotificationService notificationService() {
237 return notificationService;
240 public @NonNull DOMNotificationPublishService notificationPublishService() {
241 return notificationPublishService;
244 private synchronized void removeRegistration(final SingleReg reg) {
245 replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> input != reg)));
248 private synchronized void removeRegistrations(final List<ComponentReg> regs) {
249 replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners, input -> !regs.contains(input))));
253 * Swaps registered listeners and triggers notification update.
255 * @param newListeners is used to notify listenerTypes changed
257 private void replaceListeners(final ImmutableMultimap<Absolute, Reg> newListeners) {
258 listeners = newListeners;
259 notifyListenerTypesChanged(newListeners.keySet());
262 @SuppressWarnings("checkstyle:IllegalCatch")
263 private void notifyListenerTypesChanged(final @NonNull ImmutableSet<Absolute> typesAfter) {
264 final var listenersAfter = demandListeners.streamObjects().collect(ImmutableList.toImmutableList());
265 executor.execute(() -> {
266 for (var listener : listenersAfter) {
268 listener.onDemandUpdated(typesAfter);
269 } catch (final Exception e) {
270 LOG.warn("Uncaught exception during invoking listener {}", listener, e);
277 @NonNull ListenableFuture<? extends Object> putNotificationImpl(final DOMNotification notification)
278 throws InterruptedException {
279 final var subscribers = listeners.get(notification.getType());
280 return subscribers.isEmpty() ? NO_LISTENERS : publish(notification, subscribers);
284 @NonNull ListenableFuture<?> publish(final DOMNotification notification, final Collection<Reg> subscribers) {
285 final var futures = new ArrayList<ListenableFuture<?>>(subscribers.size());
286 subscribers.forEach(subscriber -> {
287 final var event = new DOMNotificationRouterEvent(notification);
288 futures.add(event.future());
289 queueNotificationManager.submitNotification(subscriber, event);
291 return Futures.transform(Futures.successfulAsList(futures), ignored -> Empty.value(),
292 MoreExecutors.directExecutor());
298 public void close() {
301 LOG.info("DOM Notification Router stopped");
305 ExecutorService executor() {
310 ExecutorService observer() {
315 ImmutableMultimap<Absolute, ?> listeners() {
320 ObjectRegistry<DemandListener> demandListeners() {
321 return demandListeners;
324 private static void deliverEvents(final Reg reg, final ImmutableList<DOMNotificationRouterEvent> events) {
325 if (reg.notClosed()) {
326 final var listener = reg.getInstance();
327 for (var event : events) {
328 event.deliverTo(listener);
331 events.forEach(DOMNotificationRouterEvent::clear);