Fix eclipse/checkstyle warnings
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.ImmutableList;
13 import java.util.ArrayDeque;
14 import java.util.Collection;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.List;
18 import java.util.Queue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.RejectedExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.locks.Condition;
25 import java.util.concurrent.locks.Lock;
26 import java.util.concurrent.locks.ReentrantLock;
27 import java.util.stream.Collectors;
28 import javax.annotation.Nonnull;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
35  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
36  * {@link Executor}.
37  *
38  * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
39  * task for a listener when there are pending notifications. On the first notification(s), a queue
40  * is created and a task is submitted to the executor to dispatch the queue to the associated
41  * listener. Any subsequent notifications that occur before all previous notifications have been
42  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
43  * queue and task are discarded.
44  *
45  * @author Thomas Pantelis
46  *
47  * @param <L> the listener type
48  * @param <N> the notification type
49  */
50 public class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
51
52     /**
53      * Interface implemented by clients that does the work of invoking listeners with notifications.
54      *
55      * @author Thomas Pantelis
56      *
57      * @param <L> the listener type
58      * @param <N> the notification type
59      *
60      * @deprecated Use {@link QueuedNotificationManager.BatchedInvoker} instead.
61      */
62     @Deprecated
63     @FunctionalInterface
64     public interface Invoker<L, N> {
65         /**
66          * Called to invoke a listener with a notification.
67          *
68          * @param listener the listener to invoke
69          * @param notification the notification to send
70          */
71         void invokeListener(L listener, N notification);
72     }
73
74     @FunctionalInterface
75     public interface BatchedInvoker<L, N> {
76         /**
77          * Called to invoke a listener with a notification.
78          *
79          * @param listener the listener to invoke
80          * @param notifications notifications to send
81          */
82         void invokeListener(@Nonnull L listener, @Nonnull Collection<? extends N> notifications);
83     }
84
85     private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
86
87     /**
88      * Caps the maximum number of attempts to offer notification to a particular listener.  Each
89      * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
90      */
91     private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
92     private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
93     private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
94
95     private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
96     private final BatchedInvoker<L, N> listenerInvoker;
97     private final Executor executor;
98     private final String name;
99     private final int maxQueueCapacity;
100
101     private QueuedNotificationManager(final Executor executor, final BatchedInvoker<L, N> listenerInvoker,
102             final int maxQueueCapacity, final String name) {
103         Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
104         this.executor = Preconditions.checkNotNull(executor);
105         this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker);
106         this.maxQueueCapacity = maxQueueCapacity;
107         this.name = Preconditions.checkNotNull(name);
108     }
109
110     /**
111      * Constructor.
112      *
113      * @param executor the {@link Executor} to use for notification tasks
114      * @param listenerInvoker the {@link Invoker} to use for invoking listeners
115      * @param maxQueueCapacity the capacity of each listener queue
116      * @param name the name of this instance for logging info
117      *
118      * @deprecated Use {@link #create(Executor, BatchedInvoker, int, String)} instead.
119      */
120     @Deprecated
121     @SuppressWarnings("checkstyle:illegalCatch")
122     public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
123             final int maxQueueCapacity, final String name) {
124         this(executor, (BatchedInvoker<L, N>)(listener, notifications) -> notifications.forEach(n -> {
125             try {
126                 listenerInvoker.invokeListener(listener, n);
127             } catch (Exception e) {
128                 LOG.error("{}: Error notifying listener {} with {}", name, listener, n, e);
129             }
130
131         }), maxQueueCapacity, name);
132         Preconditions.checkNotNull(listenerInvoker);
133     }
134
135     /**
136      * Create a new notification manager.
137      *
138      * @param executor the {@link Executor} to use for notification tasks
139      * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners
140      * @param maxQueueCapacity the capacity of each listener queue
141      * @param name the name of this instance for logging info
142      */
143     public static <L, N> QueuedNotificationManager<L, N> create(final Executor executor,
144             final BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity, final String name) {
145         return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
146     }
147
148     /**
149      * Returns the maximum listener queue capacity.
150      */
151     public int getMaxQueueCapacity() {
152         return maxQueueCapacity;
153     }
154
155     /**
156      * Returns the {@link Executor} to used for notification tasks.
157      */
158     public Executor getExecutor() {
159         return executor;
160     }
161
162     /* (non-Javadoc)
163      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
164      */
165     @Override
166     public void submitNotification(final L listener, final N notification) throws RejectedExecutionException {
167         if (notification != null) {
168             submitNotifications(listener, Collections.singletonList(notification));
169         }
170     }
171
172     /* (non-Javadoc)
173      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
174      */
175     @Override
176     public void submitNotifications(final L listener, final Iterable<N> notifications)
177             throws RejectedExecutionException {
178
179         if (notifications == null || listener == null) {
180             return;
181         }
182
183         LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
184
185         final ListenerKey<L> key = new ListenerKey<>(listener);
186
187         // Keep looping until we are either able to add a new NotificationTask or are able to
188         // add our notifications to an existing NotificationTask. Eventually one or the other
189         // will occur.
190         try {
191             Iterator<N> it = notifications.iterator();
192
193             while (true) {
194                 NotificationTask task = listenerCache.get(key);
195                 if (task == null) {
196                     // No task found, try to insert a new one
197                     final NotificationTask newTask = new NotificationTask(key, it);
198                     task = listenerCache.putIfAbsent(key, newTask);
199                     if (task == null) {
200                         // We were able to put our new task - now submit it to the executor and
201                         // we're done. If it throws a RejectedExecutionException, let that propagate
202                         // to the caller.
203                         runTask(listener, newTask);
204                         break;
205                     }
206
207                     // We have a racing task, hence we can continue, but we need to refresh our iterator from
208                     // the task.
209                     it = newTask.recoverItems();
210                 }
211
212                 final boolean completed = task.submitNotifications(it);
213                 if (!completed) {
214                     // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
215                     // than spinning on removal, we try to replace it.
216                     final NotificationTask newTask = new NotificationTask(key, it);
217                     if (listenerCache.replace(key, task, newTask)) {
218                         runTask(listener, newTask);
219                         break;
220                     }
221
222                     // We failed to replace the task, hence we need retry. Note we have to recover the items to be
223                     // published from the new task.
224                     it = newTask.recoverItems();
225                     LOG.debug("{}: retrying task queueing for {}", name, listener);
226                     continue;
227                 }
228
229                 // All notifications have either been delivered or we have timed out and warned about the ones we
230                 // have failed to deliver. In any case we are done here.
231                 break;
232             }
233         } catch (InterruptedException e) {
234             // We were interrupted trying to offer to the listener's queue. Somebody's probably
235             // telling us to quit.
236             LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
237         }
238
239         LOG.trace("{}: submitNotifications done for listener {}", name, listener);
240     }
241
242     /**
243      * Returns {@link ListenerNotificationQueueStats} instances for each current listener
244      * notification task in progress.
245      */
246     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
247         return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
248             t.size())).collect(Collectors.toList());
249     }
250
251     private void runTask(final L listener, final NotificationTask task) {
252         LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
253         executor.execute(task);
254     }
255
256     /**
257      * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
258      * Since we don't know anything about the listener class implementations and we're mixing
259      * multiple listener class instances in the same map, this avoids any potential issue with an
260      * equals implementation that just blindly casts the other Object to compare instead of checking
261      * for instanceof.
262      */
263     private static final class ListenerKey<L> {
264         private final L listener;
265
266         ListenerKey(final L listener) {
267             this.listener = Preconditions.checkNotNull(listener);
268         }
269
270         L getListener() {
271             return listener;
272         }
273
274         @Override
275         public int hashCode() {
276             return System.identityHashCode(listener);
277         }
278
279         @Override
280         public boolean equals(final Object obj) {
281             if (obj == this) {
282                 return true;
283             }
284             return obj instanceof ListenerKey<?> && listener == ((ListenerKey<?>) obj).listener;
285         }
286
287         @Override
288         public String toString() {
289             return listener.toString();
290         }
291     }
292
293     /**
294      * Executor task for a single listener that queues notifications and sends them serially to the
295      * listener.
296      */
297     private class NotificationTask implements Runnable {
298
299         private final Lock lock = new ReentrantLock();
300         private final Condition notEmpty = lock.newCondition();
301         private final Condition notFull = lock.newCondition();
302         private final ListenerKey<L> listenerKey;
303
304         @GuardedBy("lock")
305         private final Queue<N> queue = new ArrayDeque<>();
306         @GuardedBy("lock")
307         private boolean exiting;
308
309         NotificationTask(final ListenerKey<L> listenerKey, final Iterator<N> notifications) {
310             this.listenerKey = Preconditions.checkNotNull(listenerKey);
311             while (notifications.hasNext()) {
312                 queue.offer(notifications.next());
313             }
314         }
315
316         Iterator<N> recoverItems() {
317             // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
318             // get started, hence this is safe.
319             return queue.iterator();
320         }
321
322         int size() {
323             lock.lock();
324             try {
325                 return queue.size();
326             } finally {
327                 lock.unlock();
328             }
329         }
330
331         boolean submitNotifications(final Iterator<N> notifications) throws InterruptedException {
332             final long start = System.nanoTime();
333             final long deadline = start + GIVE_UP_NANOS;
334
335             lock.lock();
336             try {
337                 // Lock may have blocked for some time, we need to take that into account. We may have exceedded
338                 // the deadline, but that is unlikely and even in that case we can make some progress without further
339                 // blocking.
340                 long canWait = deadline - System.nanoTime();
341
342                 while (true) {
343                     // Check the exiting flag - if true then #run is in the process of exiting so return
344                     // false to indicate such. Otherwise, offer the notifications to the queue.
345                     if (exiting) {
346                         return false;
347                     }
348
349                     final int avail = maxQueueCapacity - queue.size();
350                     if (avail <= 0) {
351                         if (canWait <= 0) {
352                             LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
353                                 + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
354                                 + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
355                                 listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
356                             return true;
357                         }
358
359                         canWait = notFull.awaitNanos(canWait);
360                         continue;
361                     }
362
363                     for (int i = 0; i < avail; ++i) {
364                         if (!notifications.hasNext()) {
365                             notEmpty.signal();
366                             return true;
367                         }
368
369                         queue.offer(notifications.next());
370                     }
371                 }
372             } finally {
373                 lock.unlock();
374             }
375         }
376
377         @GuardedBy("lock")
378         private boolean waitForQueue() {
379             long timeout = TASK_WAIT_NANOS;
380
381             while (queue.isEmpty()) {
382                 if (timeout <= 0) {
383                     return false;
384                 }
385
386                 try {
387                     timeout = notEmpty.awaitNanos(timeout);
388                 } catch (InterruptedException e) {
389                     // The executor is probably shutting down so log as debug.
390                     LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
391                     return false;
392                 }
393             }
394
395             return true;
396         }
397
398         @Override
399         public void run() {
400             try {
401                 // Loop until we've dispatched all the notifications in the queue.
402                 while (true) {
403                     final Collection<N> notifications;
404
405                     lock.lock();
406                     try {
407                         if (!waitForQueue()) {
408                             exiting = true;
409                             break;
410                         }
411
412                         // Splice the entire queue
413                         notifications = ImmutableList.copyOf(queue);
414                         queue.clear();
415
416                         notFull.signalAll();
417                     } finally {
418                         lock.unlock();
419                     }
420
421                     invokeListener(notifications);
422                 }
423             } finally {
424                 // We're exiting, gracefully or not - either way make sure we always remove
425                 // ourselves from the cache.
426                 listenerCache.remove(listenerKey, this);
427             }
428         }
429
430         @SuppressWarnings("checkstyle:illegalCatch")
431         private void invokeListener(final Collection<N> notifications) {
432             LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
433             try {
434                 listenerInvoker.invokeListener(listenerKey.getListener(), notifications);
435             } catch (Exception e) {
436                 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
437                 LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
438             }
439         }
440     }
441 }