Replace lmax disruptor with QueuedNotificationManager
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.collect.ImmutableList;
12 import com.google.common.collect.ImmutableMultimap;
13 import com.google.common.collect.ImmutableMultimap.Builder;
14 import com.google.common.collect.Multimap;
15 import com.google.common.collect.Multimaps;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.ThreadFactoryBuilder;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.Set;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ScheduledFuture;
27 import java.util.concurrent.ScheduledThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import org.opendaylight.mdsal.dom.api.DOMNotification;
30 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
31 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
32 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
33 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
34 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
35 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
36 import org.opendaylight.yangtools.concepts.ListenerRegistration;
37 import org.opendaylight.yangtools.util.ListenerRegistry;
38 import org.opendaylight.yangtools.util.concurrent.EqualityQueuedNotificationManager;
39 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
40 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
41 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
47  * routing of notifications from publishers to subscribers.
48  *
49  *<p>
50  * Internal implementation one by using a {@link QueuedNotificationManager}.
51  *</p>
52  */
53 public class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
54         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
55
56     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
57     private static final ListenableFuture<Void> NO_LISTENERS = FluentFutures.immediateNullFluentFuture();
58
59     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
60             ListenerRegistry.create();
61     private final EqualityQueuedNotificationManager<AbstractListenerRegistration<? extends DOMNotificationListener>,
62                 DOMNotificationRouterEvent> queueNotificationManager;
63     private final ScheduledThreadPoolExecutor observer;
64     private final ExecutorService executor;
65
66     private volatile Multimap<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> listeners =
67             ImmutableMultimap.of();
68
69     @VisibleForTesting
70     DOMNotificationRouter(int maxQueueCapacity) {
71         observer = new ScheduledThreadPoolExecutor(1,
72             new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-observer-%d").build());
73         executor = Executors.newCachedThreadPool(
74             new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DOMNotificationRouter-listeners-%d").build());
75         queueNotificationManager = new EqualityQueuedNotificationManager<>("DOMNotificationRouter", executor,
76                 maxQueueCapacity, DOMNotificationRouter::deliverEvents);
77     }
78
79     public static DOMNotificationRouter create(int maxQueueCapacity) {
80         return new DOMNotificationRouter(maxQueueCapacity);
81     }
82
83     @Override
84     public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
85             final T listener, final Collection<Absolute> types) {
86         final AbstractListenerRegistration<T> reg = new AbstractListenerRegistration<>(listener) {
87             @Override
88             protected void removeRegistration() {
89                 synchronized (DOMNotificationRouter.this) {
90                     replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners,
91                         input -> input != this)));
92                 }
93             }
94         };
95
96         if (!types.isEmpty()) {
97             final Builder<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> b =
98                     ImmutableMultimap.builder();
99             b.putAll(listeners);
100
101             for (final Absolute t : types) {
102                 b.put(t, reg);
103             }
104
105             replaceListeners(b.build());
106         }
107
108         return reg;
109     }
110
111     /**
112      * Swaps registered listeners and triggers notification update.
113      *
114      * @param newListeners is used to notify listenerTypes changed
115      */
116     private void replaceListeners(
117             final Multimap<Absolute, AbstractListenerRegistration<? extends DOMNotificationListener>> newListeners) {
118         listeners = newListeners;
119         notifyListenerTypesChanged(newListeners.keySet());
120     }
121
122     @SuppressWarnings("checkstyle:IllegalCatch")
123     private void notifyListenerTypesChanged(final Set<Absolute> typesAfter) {
124         final List<? extends DOMNotificationSubscriptionListener> listenersAfter =
125                 subscriptionListeners.streamListeners().collect(ImmutableList.toImmutableList());
126         executor.execute(() -> {
127             for (final DOMNotificationSubscriptionListener subListener : listenersAfter) {
128                 try {
129                     subListener.onSubscriptionChanged(typesAfter);
130                 } catch (final Exception e) {
131                     LOG.warn("Uncaught exception during invoking listener {}", subListener, e);
132                 }
133             }
134         });
135     }
136
137     @Override
138     public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
139             final L listener) {
140         final Set<Absolute> initialTypes = listeners.keySet();
141         executor.execute(() -> listener.onSubscriptionChanged(initialTypes));
142         return subscriptionListeners.register(listener);
143     }
144
145
146     @VisibleForTesting
147     ListenableFuture<? extends Object> publish(DOMNotification notification,
148             final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
149         final List<ListenableFuture<Void>> futures = new ArrayList<>(subscribers.size());
150         subscribers.forEach(subscriber -> {
151             final DOMNotificationRouterEvent event = new DOMNotificationRouterEvent(notification);
152             futures.add(event.future());
153             queueNotificationManager.submitNotification(subscriber, event);
154         });
155         return Futures.transform(Futures.successfulAsList(futures), ignored -> (Void)null,
156             MoreExecutors.directExecutor());
157     }
158
159     @Override
160     public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
161             throws InterruptedException {
162         final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
163                 listeners.get(notification.getType());
164         if (subscribers.isEmpty()) {
165             return NO_LISTENERS;
166         }
167
168         return publish(notification, subscribers);
169     }
170
171     @Override
172     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
173         final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
174                 listeners.get(notification.getType());
175         if (subscribers.isEmpty()) {
176             return NO_LISTENERS;
177         }
178
179         return publish(notification, subscribers);
180     }
181
182     @Override
183     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
184             final TimeUnit unit) throws InterruptedException {
185         final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers =
186                 listeners.get(notification.getType());
187         if (subscribers.isEmpty()) {
188             return NO_LISTENERS;
189         }
190         // Attempt to perform a non-blocking publish first
191         final ListenableFuture<?> noBlock = publish(notification, subscribers);
192         if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
193             return noBlock;
194         }
195
196         try {
197             final Thread publishThread = Thread.currentThread();
198             ScheduledFuture<?> timerTask = observer.schedule(publishThread::interrupt, timeout, unit);
199             final ListenableFuture<?> withBlock = putNotification(notification);
200             timerTask.cancel(true);
201             if (observer.getQueue().size() > 50) {
202                 observer.purge();
203             }
204             return withBlock;
205         } catch (InterruptedException e) {
206             return DOMNotificationPublishService.REJECTED;
207         }
208     }
209
210     @Override
211     public void close() {
212         observer.shutdown();
213         executor.shutdown();
214     }
215
216     @VisibleForTesting
217     ExecutorService executor() {
218         return executor;
219     }
220
221     @VisibleForTesting
222     ExecutorService observer() {
223         return observer;
224     }
225
226     @VisibleForTesting
227     Multimap<Absolute, ?> listeners() {
228         return listeners;
229     }
230
231     @VisibleForTesting
232     ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners() {
233         return subscriptionListeners;
234     }
235
236     private static void deliverEvents(final AbstractListenerRegistration<? extends DOMNotificationListener> reg,
237             final ImmutableList<DOMNotificationRouterEvent> events) {
238         if (reg.notClosed()) {
239             final DOMNotificationListener listener = reg.getInstance();
240             for (DOMNotificationRouterEvent event : events) {
241                 event.deliverTo(listener);
242             }
243         } else {
244             events.forEach(DOMNotificationRouterEvent::clear);
245         }
246     }
247 }