Replace SchemaPath with SchemaNodeIdentifier.Absolute/QName
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.ImmutableMultimap;
15 import com.google.common.collect.ImmutableMultimap.Builder;
16 import com.google.common.collect.Multimap;
17 import com.google.common.collect.Multimaps;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.google.common.util.concurrent.ThreadFactoryBuilder;
20 import com.lmax.disruptor.EventHandler;
21 import com.lmax.disruptor.InsufficientCapacityException;
22 import com.lmax.disruptor.PhasedBackoffWaitStrategy;
23 import com.lmax.disruptor.WaitStrategy;
24 import com.lmax.disruptor.dsl.Disruptor;
25 import com.lmax.disruptor.dsl.ProducerType;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.Set;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ScheduledFuture;
32 import java.util.concurrent.ScheduledThreadPoolExecutor;
33 import java.util.concurrent.TimeUnit;
34 import org.opendaylight.mdsal.dom.api.DOMNotification;
35 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
36 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
37 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
38 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
39 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
40 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
41 import org.opendaylight.yangtools.concepts.ListenerRegistration;
42 import org.opendaylight.yangtools.util.ListenerRegistry;
43 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
44 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
50  * routing of notifications from publishers to subscribers.
51  *
52  *<p>
53  * Internal implementation works by allocating a two-handler Disruptor. The first handler delivers notifications
54  * to subscribed listeners and the second one notifies whoever may be listening on the returned future. Registration
55  * state tracking is performed by a simple immutable multimap -- when a registration or unregistration occurs we
56  * re-generate the entire map from scratch and set it atomically. While registrations/unregistrations synchronize
57  * on this instance, notifications do not take any locks here.
58  *
59  *<p>
60  * The fully-blocking {@link #publish(long, DOMNotification, Collection)}
61  * and non-blocking {@link #offerNotification(DOMNotification)}
62  * are realized using the Disruptor's native operations. The bounded-blocking {@link
63  * #offerNotification(DOMNotification, long, TimeUnit)}
64  * is realized by arming a background wakeup interrupt.
65  */
66 public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
67         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
68
69     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
70     private static final ListenableFuture<Void> NO_LISTENERS = FluentFutures.immediateNullFluentFuture();
71     private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(
72             1L, 30L, TimeUnit.MILLISECONDS);
73     private static final EventHandler<DOMNotificationRouterEvent> DISPATCH_NOTIFICATIONS =
74         (event, sequence, endOfBatch) -> event.deliverNotification();
75     private static final EventHandler<DOMNotificationRouterEvent> NOTIFY_FUTURE =
76         (event, sequence, endOfBatch) -> event.setFuture();
77
78     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
79             ListenerRegistry.create();
80     private final Disruptor<DOMNotificationRouterEvent> disruptor;
81     private final ScheduledThreadPoolExecutor observer;
82     private final ExecutorService executor;
83
84     private volatile Multimap<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> listeners =
85             ImmutableMultimap.of();
86
87     @VisibleForTesting
88     DOMNotificationRouter(final int queueDepth, final WaitStrategy strategy) {
89         observer = new ScheduledThreadPoolExecutor(1,
90             new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-observer-%d").build());
91         executor = Executors.newCachedThreadPool(
92             new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-listeners-%d").build());
93         disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY, queueDepth,
94                 new ThreadFactoryBuilder().setNameFormat("DOMNotificationRouter-disruptor-%d").build(),
95                 ProducerType.MULTI, strategy);
96         disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
97         disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
98         disruptor.start();
99     }
100
101     public static DOMNotificationRouter create(final int queueDepth) {
102         return new DOMNotificationRouter(queueDepth, DEFAULT_STRATEGY);
103     }
104
105     public static DOMNotificationRouter create(final int queueDepth, final long spinTime, final long parkTime,
106             final TimeUnit unit) {
107         checkArgument(Long.lowestOneBit(queueDepth) == Long.highestOneBit(queueDepth),
108                 "Queue depth %s is not power-of-two", queueDepth);
109         return new DOMNotificationRouter(queueDepth, PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit));
110     }
111
112     @Override
113     public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
114             final T listener, final Collection<Absolute> types) {
115         final AbstractListenerRegistration<T> reg = new AbstractListenerRegistration<>(listener) {
116             @Override
117             protected void removeRegistration() {
118                 synchronized (DOMNotificationRouter.this) {
119                     replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners,
120                         input -> input != this)));
121                 }
122             }
123         };
124
125         if (!types.isEmpty()) {
126             final Builder<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> b =
127                     ImmutableMultimap.builder();
128             b.putAll(listeners);
129
130             for (final Absolute t : types) {
131                 b.put(t, reg);
132             }
133
134             replaceListeners(b.build());
135         }
136
137         return reg;
138     }
139
140     /**
141      * Swaps registered listeners and triggers notification update.
142      *
143      * @param newListeners is used to notify listenerTypes changed
144      */
145     private void replaceListeners(
146             final Multimap<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> newListeners) {
147         listeners = newListeners;
148         notifyListenerTypesChanged(newListeners.keySet());
149     }
150
151     @SuppressWarnings("checkstyle:IllegalCatch")
152     private void notifyListenerTypesChanged(final Set<Absolute> typesAfter) {
153         final List<? extends DOMNotificationSubscriptionListener> listenersAfter =
154                 subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
155         executor.execute(() -> {
156             for (final DOMNotificationSubscriptionListener subListener : listenersAfter) {
157                 try {
158                     subListener.onSubscriptionChanged(typesAfter);
159                 } catch (final Exception e) {
160                     LOG.warn("Uncaught exception during invoking listener {}", subListener, e);
161                 }
162             }
163         });
164     }
165
166     @Override
167     public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
168             final L listener) {
169         final Set<Absolute> initialTypes = listeners.keySet();
170         executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
171         return subscriptionListeners.register(listener);
172     }
173
174     private ListenableFuture<Void> publish(final long seq, final DOMNotification notification,
175             final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
176         final DOMNotificationRouterEvent event = disruptor.get(seq);
177         final ListenableFuture<Void> future = event.initialize(notification, subscribers);
178         disruptor.getRingBuffer().publish(seq);
179         return future;
180     }
181
182     @Override
183     public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
184             throws InterruptedException {
185         final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
186                 listeners.get(notification.getType());
187         if (subscribers.isEmpty()) {
188             return NO_LISTENERS;
189         }
190
191         final long seq = disruptor.getRingBuffer().next();
192         return publish(seq, notification, subscribers);
193     }
194
195     @SuppressWarnings("checkstyle:IllegalCatch")
196     @VisibleForTesting
197     ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
198             final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
199         final long seq;
200         try {
201             seq = disruptor.getRingBuffer().tryNext();
202         } catch (final InsufficientCapacityException e) {
203             return DOMNotificationPublishService.REJECTED;
204         }
205
206         return publish(seq, notification, subscribers);
207     }
208
209     @Override
210     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
211         final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
212                 listeners.get(notification.getType());
213         if (subscribers.isEmpty()) {
214             return NO_LISTENERS;
215         }
216
217         return tryPublish(notification, subscribers);
218     }
219
220     @Override
221     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
222             final TimeUnit unit) throws InterruptedException {
223         final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
224                 listeners.get(notification.getType());
225         if (subscribers.isEmpty()) {
226             return NO_LISTENERS;
227         }
228         // Attempt to perform a non-blocking publish first
229         final ListenableFuture<?> noBlock = tryPublish(notification, subscribers);
230         if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
231             return noBlock;
232         }
233
234         try {
235             final Thread publishThread = Thread.currentThread();
236             ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
237             final ListenableFuture<?> withBlock = putNotification(notification);
238             timerTask.cancel(true);
239             if (observer.getQueue().size() > 50) {
240                 observer.purge();
241             }
242             return withBlock;
243         } catch (InterruptedException e) {
244             return DOMNotificationPublishService.REJECTED;
245         }
246     }
247
248     @Override
249     public void close() {
250         disruptor.shutdown();
251         observer.shutdown();
252         executor.shutdown();
253     }
254
255     @VisibleForTesting
256     ExecutorService executor() {
257         return executor;
258     }
259
260     @VisibleForTesting
261     ExecutorService observer() {
262         return observer;
263     }
264
265     @VisibleForTesting
266     Multimap<Absolute, ?> listeners() {
267         return listeners;
268     }
269
270     @VisibleForTesting
271     ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners() {
272         return subscriptionListeners;
273     }
274 }