checkStyleViolationSeverity=error implemented for mdsal-dom-broker
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouter.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Predicate;
12 import com.google.common.collect.ImmutableList;
13 import com.google.common.collect.ImmutableMultimap;
14 import com.google.common.collect.ImmutableMultimap.Builder;
15 import com.google.common.collect.Multimap;
16 import com.google.common.collect.Multimaps;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListenableFuture;
19 import com.lmax.disruptor.EventHandler;
20 import com.lmax.disruptor.InsufficientCapacityException;
21 import com.lmax.disruptor.PhasedBackoffWaitStrategy;
22 import com.lmax.disruptor.WaitStrategy;
23 import com.lmax.disruptor.dsl.Disruptor;
24 import com.lmax.disruptor.dsl.ProducerType;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.List;
28 import java.util.Set;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.TimeUnit;
32 import org.opendaylight.mdsal.dom.api.DOMNotification;
33 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
35 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
36 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
37 import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListenerRegistry;
38 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
39 import org.opendaylight.yangtools.concepts.ListenerRegistration;
40 import org.opendaylight.yangtools.util.ListenerRegistry;
41 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 /**
46  * Joint implementation of {@link DOMNotificationPublishService} and {@link DOMNotificationService}. Provides
47  * routing of notifications from publishers to subscribers.
48  *
49  *<p>
50  * Internal implementation works by allocating a two-handler Disruptor. The first handler delivers notifications
51  * to subscribed listeners and the second one notifies whoever may be listening on the returned future. Registration
52  * state tracking is performed by a simple immutable multimap -- when a registration or unregistration occurs we
53  * re-generate the entire map from scratch and set it atomically. While registrations/unregistrations synchronize
54  * on this instance, notifications do not take any locks here.
55  *
56  *<p>
57  * The fully-blocking {@link #publish(long, DOMNotification, Collection)}
58  * and non-blocking {@link #offerNotification(DOMNotification)}
59  * are realized using the Disruptor's native operations. The bounded-blocking {@link
60  * #offerNotification(DOMNotification, long, TimeUnit)}
61  * is realized by arming a background wakeup interrupt.
62  */
63 public final class DOMNotificationRouter implements AutoCloseable, DOMNotificationPublishService,
64         DOMNotificationService, DOMNotificationSubscriptionListenerRegistry {
65
66     private static final Logger LOG = LoggerFactory.getLogger(DOMNotificationRouter.class);
67     private static final ListenableFuture<Void> NO_LISTENERS = Futures.immediateFuture(null);
68     private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(
69             1L, 30L, TimeUnit.MILLISECONDS);
70     private static final EventHandler<DOMNotificationRouterEvent> DISPATCH_NOTIFICATIONS =
71             new EventHandler<DOMNotificationRouterEvent>() {
72         @Override
73         public void onEvent(final DOMNotificationRouterEvent event, final long sequence,
74                 final boolean endOfBatch) throws Exception {
75             event.deliverNotification();
76
77         }
78     };
79     private static final EventHandler<DOMNotificationRouterEvent> NOTIFY_FUTURE =
80             new EventHandler<DOMNotificationRouterEvent>() {
81         @Override
82         public void onEvent(final DOMNotificationRouterEvent event, final long sequence, final boolean endOfBatch) {
83             event.setFuture();
84         }
85     };
86
87     private final Disruptor<DOMNotificationRouterEvent> disruptor;
88     private final ExecutorService executor;
89     private volatile Multimap<SchemaPath, ListenerRegistration<? extends
90             DOMNotificationListener>> listeners = ImmutableMultimap.of();
91     private final ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
92             ListenerRegistry.create();
93
94     @SuppressWarnings("unchecked")
95     private DOMNotificationRouter(final ExecutorService executor, final int queueDepth, final WaitStrategy strategy) {
96         this.executor = Preconditions.checkNotNull(executor);
97
98         disruptor = new Disruptor<>(DOMNotificationRouterEvent.FACTORY,
99                 queueDepth, executor, ProducerType.MULTI, strategy);
100         disruptor.handleEventsWith(DISPATCH_NOTIFICATIONS);
101         disruptor.after(DISPATCH_NOTIFICATIONS).handleEventsWith(NOTIFY_FUTURE);
102         disruptor.start();
103     }
104
105     public static DOMNotificationRouter create(final int queueDepth) {
106         final ExecutorService executor = Executors.newCachedThreadPool();
107
108         return new DOMNotificationRouter(executor, queueDepth, DEFAULT_STRATEGY);
109     }
110
111     public static DOMNotificationRouter create(final int queueDepth, final long spinTime,
112             final long parkTime, final TimeUnit unit) {
113         final ExecutorService executor = Executors.newCachedThreadPool();
114         final WaitStrategy strategy = PhasedBackoffWaitStrategy.withLock(spinTime, parkTime, unit);
115
116         return new DOMNotificationRouter(executor, queueDepth, strategy);
117     }
118
119     @Override
120     public synchronized <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
121             final T listener, final Collection<SchemaPath> types) {
122         final ListenerRegistration<T> reg = new AbstractListenerRegistration<T>(listener) {
123             @Override
124             protected void removeRegistration() {
125                 final ListenerRegistration<T> me = this;
126
127                 synchronized (DOMNotificationRouter.this) {
128                     replaceListeners(ImmutableMultimap.copyOf(Multimaps.filterValues(listeners,
129                             new Predicate<ListenerRegistration<? extends DOMNotificationListener>>() {
130                             @Override
131                             public boolean apply(final ListenerRegistration<? extends DOMNotificationListener> input) {
132                                 return input != me;
133                             }
134                         })));
135                 }
136             }
137         };
138
139         if (!types.isEmpty()) {
140             final Builder<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> b =
141                     ImmutableMultimap.builder();
142             b.putAll(listeners);
143
144             for (final SchemaPath t : types) {
145                 b.put(t, reg);
146             }
147
148             replaceListeners(b.build());
149         }
150
151         return reg;
152     }
153
154     @Override
155     public <T extends DOMNotificationListener> ListenerRegistration<T> registerNotificationListener(
156             final T listener, final SchemaPath... types) {
157         return registerNotificationListener(listener, Arrays.asList(types));
158     }
159
160     /**
161      * Swaps registered listeners and triggers notification update.
162      *
163      * @param newListeners is used to notify listenerTypes changed
164      */
165     private void replaceListeners(
166             final Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> newListeners) {
167         listeners = newListeners;
168         notifyListenerTypesChanged(newListeners.keySet());
169     }
170
171     @SuppressWarnings("checkstyle:IllegalCatch")
172     private void notifyListenerTypesChanged(final Set<SchemaPath> typesAfter) {
173         final List<ListenerRegistration<DOMNotificationSubscriptionListener>> listenersAfter =
174                 ImmutableList.copyOf(subscriptionListeners.getListeners());
175         executor.submit(new Runnable() {
176
177             @Override
178             public void run() {
179                 for (final ListenerRegistration<DOMNotificationSubscriptionListener> subListener : listenersAfter) {
180                     try {
181                         subListener.getInstance().onSubscriptionChanged(typesAfter);
182                     } catch (final Exception e) {
183                         LOG.warn("Uncaught exception during invoking listener {}", subListener.getInstance(), e);
184                     }
185                 }
186             }
187         });
188     }
189
190     @Override
191     public <L extends DOMNotificationSubscriptionListener> ListenerRegistration<L> registerSubscriptionListener(
192             final L listener) {
193         final Set<SchemaPath> initialTypes = listeners.keySet();
194         executor.submit(new Runnable() {
195
196             @Override
197             public void run() {
198                 listener.onSubscriptionChanged(initialTypes);
199             }
200         });
201         return subscriptionListeners.registerWithType(listener);
202     }
203
204     private ListenableFuture<Void> publish(final long seq, final DOMNotification notification,
205             final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
206         final DOMNotificationRouterEvent event = disruptor.get(seq);
207         final ListenableFuture<Void> future = event.initialize(notification, subscribers);
208         disruptor.getRingBuffer().publish(seq);
209         return future;
210     }
211
212     @Override
213     public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
214             throws InterruptedException {
215         final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
216                 listeners.get(notification.getType());
217         if (subscribers.isEmpty()) {
218             return NO_LISTENERS;
219         }
220
221         final long seq = disruptor.getRingBuffer().next();
222         return publish(seq, notification, subscribers);
223     }
224
225     @SuppressWarnings("checkstyle:IllegalCatch")
226     private ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
227             final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
228         final long seq;
229         try {
230             seq = disruptor.getRingBuffer().tryNext();
231         } catch (final InsufficientCapacityException e) {
232             return DOMNotificationPublishService.REJECTED;
233         }
234
235         return publish(seq, notification, subscribers);
236     }
237
238     @Override
239     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
240         final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
241                 listeners.get(notification.getType());
242         if (subscribers.isEmpty()) {
243             return NO_LISTENERS;
244         }
245
246         return tryPublish(notification, subscribers);
247     }
248
249     @Override
250     public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
251             final TimeUnit unit) throws InterruptedException {
252         final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers =
253                 listeners.get(notification.getType());
254         if (subscribers.isEmpty()) {
255             return NO_LISTENERS;
256         }
257
258         // Attempt to perform a non-blocking publish first
259         final ListenableFuture<? extends Object> noBlock = tryPublish(notification, subscribers);
260         if (!DOMNotificationPublishService.REJECTED.equals(noBlock)) {
261             return noBlock;
262         }
263
264         /*
265          * FIXME: we need a background thread, which will watch out for blocking too long. Here
266          *        we will arm a tasklet for it and synchronize delivery of interrupt properly.
267          */
268         throw new UnsupportedOperationException("Not implemented yet");
269     }
270
271     @Override
272     public void close() {
273         disruptor.shutdown();
274         executor.shutdown();
275     }
276 }