2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.broker;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.ImmutableList;
12 import com.google.common.collect.ImmutableMultimap;
13 import com.google.common.collect.ImmutableMultimap.Builder;
14 import com.google.common.collect.Multimap;
15 import com.google.common.collect.Multimaps;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.ThreadFactoryBuilder;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.List;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledFuture;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.PreDestroy;
30 import javax.inject.Inject;
31 import org.opendaylight.mdsal.dom.api.DOMNotification;
32 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
33 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
35 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
36 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
37 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
38 import org.opendaylight.yangtools.concepts.ListenerRegistration;
39 import org.opendaylight.yangtools.util.ListenerRegistry;
40 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
41 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
42 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
43 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
44 import org.osgi.service.component.annotations.Activate;
45 import org.osgi.service.component.annotations.Component;
46 import org.osgi.service.component.annotations.Deactivate;
47 import org.osgi.service.metatype.annotations.AttributeDefinition;
48 import org.osgi.service.metatype.annotations.Designate;
49 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
54 * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
55 * routing of notifications from publishers to subscribers.
58 * Internal implementation one by using a {@link QueuedNotificationManager}.
61 @Component(immediate = true, configurationPid = "org.opendaylight.mdsal.dom.notification", service = {
62 DOMNotificationService.class, DOMNotificationPublishService.class,
63 DOMNotificationSubscriptionListenerRegistry.class
65 @Designate(ocd = DOMNotificationRouter.Config.class)
66 // Non-final for testing
67 public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
68 DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
69 @ObjectClassDefinition()
70 public @interface Config {
71 @AttributeDefinition(name = "notification-queue-depth")
72 int queueDepth() default 65536;
75 private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
76 private static final ListenableFuture<Void> NO_LISTENERS = FluentFutures.immediateNullFluentFuture();
78 private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
79 ListenerRegistry.create();
80 private final EqualityQueuedNotificationManager<AbstractListenerRegistration<? extends DOMNotificationListener>,
81 DOMNotificationRouterEvent> queueNotificationManager;
82 private final ScheduledThreadPoolExecutor observer;
83 private final ExecutorService executor;
85 private volatile Multimap<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> listeners =
86 ImmutableMultimap.of();
89 public DOMNotificationRouter(final int maxQueueCapacity) {
90 observer = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
92 .setNameFormat("DOMNotificationRouter-observer-%d")
94 executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
96 .setNameFormat("DOMNotificationRouter-listeners-%d")
98 queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
99 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
103 public DOMNotificationRouter(final Config config) {
104 this(config.queueDepth());
105 LOG.info("DOM Notification Router started");
108 @Deprecated(forRemoval = true)
109 public static DOMNotificationRouter create(final int maxQueueCapacity) {
110 return new DOMNotificationRouter(maxQueueCapacity);
114 public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
115 final T listener, final Collection<Absolute> types) {
116 final AbstractListenerRegistration<T> reg = new AbstractListenerRegistration<>(listener) {
118 protected void removeRegistration() {
119 synchronized (DOMNotificationRouter.this) {
120 replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners,
121 input -> input != this)));
126 if (!types.isEmpty()) {
127 final Builder<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> b =
128 ImmutableMultimap.builder();
131 for (final Absolute t : types) {
135 replaceListeners(b.build());
142 * Swaps registered listeners and triggers notification update.
144 * @param newListeners is used to notify listenerTypes changed
146 private void replaceListeners(
147 final Multimap<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> newListeners) {
148 listeners = newListeners;
149 notifyListenerTypesChanged(newListeners.keySet());
152 @SuppressWarnings("checkstyle:IllegalCatch")
153 private void notifyListenerTypesChanged(final Set<Absolute> typesAfter) {
154 final List<? extends DOMNotificationSubscriptionListener> listenersAfter =
155 subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
156 executor.execute(() -> {
157 for (final DOMNotificationSubscriptionListener subListener : listenersAfter) {
159 subListener.onSubscriptionChanged(typesAfter);
160 } catch (final Exception e) {
161 LOG.warn("Uncaught exception during invoking listener {}", subListener, e);
168 public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
170 final Set<Absolute> initialTypes = listeners.keySet();
171 executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
172 return subscriptionListeners.register(listener);
177 ListenableFuture<? extends Object> publish(final DOMNotification notification,
178 final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
179 final List<ListenableFuture<Void>> futures = new ArrayList<>(subscribers.size());
180 subscribers.forEach(subscriber -> {
181 final DOMNotificationRouterEvent event = new DOMNotificationRouterEvent(notification);
182 futures.add(event.future());
183 queueNotificationManager.submitNotification(subscriber, event);
185 return Futures.transform(Futures.successfulAsList(futures), ignored -> (Void)null,
186 MoreExecutors.directExecutor());
190 public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
191 throws InterruptedException {
192 final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
193 listeners.get(notification.getType());
194 if (subscribers.isEmpty()) {
198 return publish(notification, subscribers);
202 public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
203 final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
204 listeners.get(notification.getType());
205 if (subscribers.isEmpty()) {
209 return publish(notification, subscribers);
213 public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
214 final TimeUnit unit) throws InterruptedException {
215 final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
216 listeners.get(notification.getType());
217 if (subscribers.isEmpty()) {
220 // Attempt to perform a non-blocking publish first
221 final ListenableFuture<?> noBlock = publish(notification, subscribers);
222 if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
227 final Thread publishThread = Thread.currentThread();
228 ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
229 final ListenableFuture<?> withBlock = putNotification(notification);
230 timerTask.cancel(true);
231 if (observer.getQueue().size() > 50) {
235 } catch (InterruptedException e) {
236 return DOMNotificationPublishService.REJECTED;
243 public void close() {
246 LOG.info("DOM Notification Router stopped");
250 ExecutorService executor() {
255 ExecutorService observer() {
260 Multimap<Absolute, ?> listeners() {
265 ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners() {
266 return subscriptionListeners;
269 private static void deliverEvents(final AbstractListenerRegistration<? extends DOMNotificationListener> reg,
270 final ImmutableList<DOMNotificationRouterEvent> events) {
271 if (reg.notClosed()) {
272 final DOMNotificationListener listener = reg.getInstance();
273 for (DOMNotificationRouterEvent event : events) {
274 event.deliverTo(listener);
277 events.forEach(DOMNotificationRouterEvent::clear);