Fix FindBugs violations and enable enforcement in utils
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static com.google.common.base.Preconditions.checkArgument;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.collect.ImmutableList;
15 import java.util.ArrayDeque;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.Iterator;
19 import java.util.List;
20 import java.util.Queue;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29 import java.util.stream.Collectors;
30 import javax.annotation.Nonnull;
31 import javax.annotation.concurrent.GuardedBy;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
37  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
38  * {@link Executor}.
39  *
40  * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
41  * task for a listener when there are pending notifications. On the first notification(s), a queue
42  * is created and a task is submitted to the executor to dispatch the queue to the associated
43  * listener. Any subsequent notifications that occur before all previous notifications have been
44  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
45  * queue and task are discarded.
46  *
47  * @author Thomas Pantelis
48  *
49  * @param <L> the listener type
50  * @param <N> the notification type
51  */
52 public class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
53
54     /**
55      * Interface implemented by clients that does the work of invoking listeners with notifications.
56      *
57      * @author Thomas Pantelis
58      *
59      * @param <L> the listener type
60      * @param <N> the notification type
61      *
62      * @deprecated Use {@link QueuedNotificationManager.BatchedInvoker} instead.
63      */
64     @Deprecated
65     @FunctionalInterface
66     public interface Invoker<L, N> {
67         /**
68          * Called to invoke a listener with a notification.
69          *
70          * @param listener the listener to invoke
71          * @param notification the notification to send
72          */
73         void invokeListener(L listener, N notification);
74     }
75
76     @FunctionalInterface
77     public interface BatchedInvoker<L, N> {
78         /**
79          * Called to invoke a listener with a notification.
80          *
81          * @param listener the listener to invoke
82          * @param notifications notifications to send
83          */
84         void invokeListener(@Nonnull L listener, @Nonnull Collection<? extends N> notifications);
85     }
86
87     private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
88
89     /**
90      * Caps the maximum number of attempts to offer notification to a particular listener.  Each
91      * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
92      */
93     private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
94     private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
95     private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
96
97     private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
98     private final BatchedInvoker<L, N> listenerInvoker;
99     private final Executor executor;
100     private final String name;
101     private final int maxQueueCapacity;
102
103     private QueuedNotificationManager(final Executor executor, final BatchedInvoker<L, N> listenerInvoker,
104             final int maxQueueCapacity, final String name) {
105         checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
106         this.executor = requireNonNull(executor);
107         this.listenerInvoker = requireNonNull(listenerInvoker);
108         this.maxQueueCapacity = maxQueueCapacity;
109         this.name = requireNonNull(name);
110     }
111
112     /**
113      * Constructor.
114      *
115      * @param executor the {@link Executor} to use for notification tasks
116      * @param listenerInvoker the {@link Invoker} to use for invoking listeners
117      * @param maxQueueCapacity the capacity of each listener queue
118      * @param name the name of this instance for logging info
119      *
120      * @deprecated Use {@link #create(Executor, BatchedInvoker, int, String)} instead.
121      */
122     @Deprecated
123     @SuppressWarnings("checkstyle:illegalCatch")
124     public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
125             final int maxQueueCapacity, final String name) {
126         this(executor, (BatchedInvoker<L, N>)(listener, notifications) -> notifications.forEach(n -> {
127             try {
128                 listenerInvoker.invokeListener(listener, n);
129             } catch (Exception e) {
130                 LOG.error("{}: Error notifying listener {} with {}", name, listener, n, e);
131             }
132
133         }), maxQueueCapacity, name);
134         requireNonNull(listenerInvoker);
135     }
136
137     /**
138      * Create a new notification manager.
139      *
140      * @param executor the {@link Executor} to use for notification tasks
141      * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners
142      * @param maxQueueCapacity the capacity of each listener queue
143      * @param name the name of this instance for logging info
144      */
145     public static <L, N> QueuedNotificationManager<L, N> create(final Executor executor,
146             final BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity, final String name) {
147         return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
148     }
149
150     /**
151      * Returns the maximum listener queue capacity.
152      */
153     public int getMaxQueueCapacity() {
154         return maxQueueCapacity;
155     }
156
157     /**
158      * Returns the {@link Executor} to used for notification tasks.
159      */
160     public Executor getExecutor() {
161         return executor;
162     }
163
164     /* (non-Javadoc)
165      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
166      */
167     @Override
168     public void submitNotification(final L listener, final N notification) throws RejectedExecutionException {
169         if (notification != null) {
170             submitNotifications(listener, Collections.singletonList(notification));
171         }
172     }
173
174     /* (non-Javadoc)
175      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
176      */
177     @Override
178     public void submitNotifications(final L listener, final Iterable<N> notifications)
179             throws RejectedExecutionException {
180
181         if (notifications == null || listener == null) {
182             return;
183         }
184
185         LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
186
187         final ListenerKey<L> key = new ListenerKey<>(listener);
188
189         // Keep looping until we are either able to add a new NotificationTask or are able to
190         // add our notifications to an existing NotificationTask. Eventually one or the other
191         // will occur.
192         try {
193             Iterator<N> it = notifications.iterator();
194
195             while (true) {
196                 NotificationTask task = listenerCache.get(key);
197                 if (task == null) {
198                     // No task found, try to insert a new one
199                     final NotificationTask newTask = new NotificationTask(key, it);
200                     task = listenerCache.putIfAbsent(key, newTask);
201                     if (task == null) {
202                         // We were able to put our new task - now submit it to the executor and
203                         // we're done. If it throws a RejectedExecutionException, let that propagate
204                         // to the caller.
205                         runTask(listener, newTask);
206                         break;
207                     }
208
209                     // We have a racing task, hence we can continue, but we need to refresh our iterator from
210                     // the task.
211                     it = newTask.recoverItems();
212                 }
213
214                 final boolean completed = task.submitNotifications(it);
215                 if (!completed) {
216                     // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
217                     // than spinning on removal, we try to replace it.
218                     final NotificationTask newTask = new NotificationTask(key, it);
219                     if (listenerCache.replace(key, task, newTask)) {
220                         runTask(listener, newTask);
221                         break;
222                     }
223
224                     // We failed to replace the task, hence we need retry. Note we have to recover the items to be
225                     // published from the new task.
226                     it = newTask.recoverItems();
227                     LOG.debug("{}: retrying task queueing for {}", name, listener);
228                     continue;
229                 }
230
231                 // All notifications have either been delivered or we have timed out and warned about the ones we
232                 // have failed to deliver. In any case we are done here.
233                 break;
234             }
235         } catch (InterruptedException e) {
236             // We were interrupted trying to offer to the listener's queue. Somebody's probably
237             // telling us to quit.
238             LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
239         }
240
241         LOG.trace("{}: submitNotifications done for listener {}", name, listener);
242     }
243
244     /**
245      * Returns {@link ListenerNotificationQueueStats} instances for each current listener
246      * notification task in progress.
247      */
248     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
249         return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
250             t.size())).collect(Collectors.toList());
251     }
252
253     private void runTask(final L listener, final NotificationTask task) {
254         LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
255         executor.execute(task);
256     }
257
258     /**
259      * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
260      * Since we don't know anything about the listener class implementations and we're mixing
261      * multiple listener class instances in the same map, this avoids any potential issue with an
262      * equals implementation that just blindly casts the other Object to compare instead of checking
263      * for instanceof.
264      */
265     private static final class ListenerKey<L> {
266         private final L listener;
267
268         ListenerKey(final L listener) {
269             this.listener = requireNonNull(listener);
270         }
271
272         L getListener() {
273             return listener;
274         }
275
276         @Override
277         public int hashCode() {
278             return System.identityHashCode(listener);
279         }
280
281         @Override
282         public boolean equals(final Object obj) {
283             if (obj == this) {
284                 return true;
285             }
286             return obj instanceof ListenerKey<?> && listener == ((ListenerKey<?>) obj).listener;
287         }
288
289         @Override
290         public String toString() {
291             return listener.toString();
292         }
293     }
294
295     /**
296      * Executor task for a single listener that queues notifications and sends them serially to the
297      * listener.
298      */
299     private class NotificationTask implements Runnable {
300
301         private final Lock lock = new ReentrantLock();
302         private final Condition notEmpty = lock.newCondition();
303         private final Condition notFull = lock.newCondition();
304         private final ListenerKey<L> listenerKey;
305
306         @GuardedBy("lock")
307         private final Queue<N> queue = new ArrayDeque<>();
308         @GuardedBy("lock")
309         private boolean exiting;
310
311         NotificationTask(final ListenerKey<L> listenerKey, final Iterator<N> notifications) {
312             this.listenerKey = requireNonNull(listenerKey);
313             while (notifications.hasNext()) {
314                 queue.add(notifications.next());
315             }
316         }
317
318         Iterator<N> recoverItems() {
319             // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
320             // get started, hence this is safe.
321             return queue.iterator();
322         }
323
324         int size() {
325             lock.lock();
326             try {
327                 return queue.size();
328             } finally {
329                 lock.unlock();
330             }
331         }
332
333         boolean submitNotifications(final Iterator<N> notifications) throws InterruptedException {
334             final long start = System.nanoTime();
335             final long deadline = start + GIVE_UP_NANOS;
336
337             lock.lock();
338             try {
339                 // Lock may have blocked for some time, we need to take that into account. We may have exceedded
340                 // the deadline, but that is unlikely and even in that case we can make some progress without further
341                 // blocking.
342                 long canWait = deadline - System.nanoTime();
343
344                 while (true) {
345                     // Check the exiting flag - if true then #run is in the process of exiting so return
346                     // false to indicate such. Otherwise, offer the notifications to the queue.
347                     if (exiting) {
348                         return false;
349                     }
350
351                     final int avail = maxQueueCapacity - queue.size();
352                     if (avail <= 0) {
353                         if (canWait <= 0) {
354                             LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
355                                 + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
356                                 + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
357                                 listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
358                             return true;
359                         }
360
361                         canWait = notFull.awaitNanos(canWait);
362                         continue;
363                     }
364
365                     for (int i = 0; i < avail; ++i) {
366                         if (!notifications.hasNext()) {
367                             notEmpty.signal();
368                             return true;
369                         }
370
371                         queue.add(notifications.next());
372                     }
373                 }
374             } finally {
375                 lock.unlock();
376             }
377         }
378
379         @GuardedBy("lock")
380         private boolean waitForQueue() {
381             long timeout = TASK_WAIT_NANOS;
382
383             while (queue.isEmpty()) {
384                 if (timeout <= 0) {
385                     return false;
386                 }
387
388                 try {
389                     timeout = notEmpty.awaitNanos(timeout);
390                 } catch (InterruptedException e) {
391                     // The executor is probably shutting down so log as debug.
392                     LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
393                     return false;
394                 }
395             }
396
397             return true;
398         }
399
400         @Override
401         public void run() {
402             try {
403                 // Loop until we've dispatched all the notifications in the queue.
404                 while (true) {
405                     final Collection<N> notifications;
406
407                     lock.lock();
408                     try {
409                         if (!waitForQueue()) {
410                             exiting = true;
411                             break;
412                         }
413
414                         // Splice the entire queue
415                         notifications = ImmutableList.copyOf(queue);
416                         queue.clear();
417
418                         notFull.signalAll();
419                     } finally {
420                         lock.unlock();
421                     }
422
423                     invokeListener(notifications);
424                 }
425             } finally {
426                 // We're exiting, gracefully or not - either way make sure we always remove
427                 // ourselves from the cache.
428                 listenerCache.remove(listenerKey, this);
429             }
430         }
431
432         @SuppressWarnings("checkstyle:illegalCatch")
433         private void invokeListener(final Collection<N> notifications) {
434             LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
435             try {
436                 listenerInvoker.invokeListener(listenerKey.getListener(), notifications);
437             } catch (Exception e) {
438                 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
439                 LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
440             }
441         }
442     }
443 }