Migrate common/util to use JDT annotations
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.collect.ImmutableList;
14 import java.util.ArrayDeque;
15 import java.util.Collection;
16 import java.util.Collections;
17 import java.util.Iterator;
18 import java.util.List;
19 import java.util.Queue;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.locks.Condition;
25 import java.util.concurrent.locks.Lock;
26 import java.util.concurrent.locks.ReentrantLock;
27 import java.util.stream.Collectors;
28 import javax.annotation.concurrent.GuardedBy;
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
35  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
36  * {@link Executor}.
37  *
38  * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
39  * task for a listener when there are pending notifications. On the first notification(s), a queue
40  * is created and a task is submitted to the executor to dispatch the queue to the associated
41  * listener. Any subsequent notifications that occur before all previous notifications have been
42  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
43  * queue and task are discarded.
44  *
45  * @author Thomas Pantelis
46  *
47  * @param <L> the listener type
48  * @param <N> the notification type
49  */
50 public class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
51
52     /**
53      * Interface implemented by clients that does the work of invoking listeners with notifications.
54      *
55      * @author Thomas Pantelis
56      *
57      * @param <L> the listener type
58      * @param <N> the notification type
59      *
60      * @deprecated Use {@link QueuedNotificationManager.BatchedInvoker} instead.
61      */
62     @Deprecated
63     @FunctionalInterface
64     public interface Invoker<L, N> {
65         /**
66          * Called to invoke a listener with a notification.
67          *
68          * @param listener the listener to invoke
69          * @param notification the notification to send
70          */
71         void invokeListener(L listener, N notification);
72     }
73
74     @FunctionalInterface
75     public interface BatchedInvoker<L, N> {
76         /**
77          * Called to invoke a listener with a notification.
78          *
79          * @param listener the listener to invoke
80          * @param notifications notifications to send
81          */
82         void invokeListener(@NonNull L listener, @NonNull Collection<? extends N> notifications);
83     }
84
85     private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
86
87     /**
88      * Caps the maximum number of attempts to offer notification to a particular listener.  Each
89      * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
90      */
91     private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
92     private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
93     private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
94
95     private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
96     private final @NonNull BatchedInvoker<L, N> listenerInvoker;
97     private final @NonNull Executor executor;
98     private final @NonNull String name;
99     private final int maxQueueCapacity;
100
101     private QueuedNotificationManager(final @NonNull Executor executor,
102             final @NonNull BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
103             final @NonNull String name) {
104         checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
105         this.executor = requireNonNull(executor);
106         this.listenerInvoker = requireNonNull(listenerInvoker);
107         this.maxQueueCapacity = maxQueueCapacity;
108         this.name = requireNonNull(name);
109     }
110
111     /**
112      * Constructor.
113      *
114      * @param executor the {@link Executor} to use for notification tasks
115      * @param listenerInvoker the {@link Invoker} to use for invoking listeners
116      * @param maxQueueCapacity the capacity of each listener queue
117      * @param name the name of this instance for logging info
118      *
119      * @deprecated Use {@link #create(Executor, BatchedInvoker, int, String)} instead.
120      */
121     @Deprecated
122     @SuppressWarnings("checkstyle:illegalCatch")
123     public QueuedNotificationManager(final @NonNull Executor executor, final @NonNull Invoker<L, N> listenerInvoker,
124             final int maxQueueCapacity, final @NonNull String name) {
125         this(executor, (BatchedInvoker<L, N>)(listener, notifications) -> notifications.forEach(n -> {
126             try {
127                 listenerInvoker.invokeListener(listener, n);
128             } catch (Exception e) {
129                 LOG.error("{}: Error notifying listener {} with {}", name, listener, n, e);
130             }
131
132         }), maxQueueCapacity, name);
133         requireNonNull(listenerInvoker);
134     }
135
136     /**
137      * Create a new notification manager.
138      *
139      * @param executor the {@link Executor} to use for notification tasks
140      * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners
141      * @param maxQueueCapacity the capacity of each listener queue
142      * @param name the name of this instance for logging info
143      */
144     public static <L, N> QueuedNotificationManager<L, N> create(final @NonNull Executor executor,
145             final@NonNull  BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
146             final @NonNull String name) {
147         return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
148     }
149
150     /**
151      * Returns the maximum listener queue capacity.
152      */
153     public int getMaxQueueCapacity() {
154         return maxQueueCapacity;
155     }
156
157     /**
158      * Returns the {@link Executor} to used for notification tasks.
159      */
160     public @NonNull Executor getExecutor() {
161         return executor;
162     }
163
164     /* (non-Javadoc)
165      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
166      */
167     @Override
168     public void submitNotification(final L listener, final N notification) {
169         if (notification != null) {
170             submitNotifications(listener, Collections.singletonList(notification));
171         }
172     }
173
174     /* (non-Javadoc)
175      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
176      */
177     @Override
178     public void submitNotifications(final L listener, final Iterable<N> notifications) {
179
180         if (notifications == null || listener == null) {
181             return;
182         }
183
184         LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
185
186         final ListenerKey<L> key = new ListenerKey<>(listener);
187
188         // Keep looping until we are either able to add a new NotificationTask or are able to
189         // add our notifications to an existing NotificationTask. Eventually one or the other
190         // will occur.
191         try {
192             Iterator<N> it = notifications.iterator();
193
194             while (true) {
195                 NotificationTask task = listenerCache.get(key);
196                 if (task == null) {
197                     // No task found, try to insert a new one
198                     final NotificationTask newTask = new NotificationTask(key, it);
199                     task = listenerCache.putIfAbsent(key, newTask);
200                     if (task == null) {
201                         // We were able to put our new task - now submit it to the executor and
202                         // we're done. If it throws a RejectedExecutionException, let that propagate
203                         // to the caller.
204                         runTask(listener, newTask);
205                         break;
206                     }
207
208                     // We have a racing task, hence we can continue, but we need to refresh our iterator from
209                     // the task.
210                     it = newTask.recoverItems();
211                 }
212
213                 final boolean completed = task.submitNotifications(it);
214                 if (!completed) {
215                     // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
216                     // than spinning on removal, we try to replace it.
217                     final NotificationTask newTask = new NotificationTask(key, it);
218                     if (listenerCache.replace(key, task, newTask)) {
219                         runTask(listener, newTask);
220                         break;
221                     }
222
223                     // We failed to replace the task, hence we need retry. Note we have to recover the items to be
224                     // published from the new task.
225                     it = newTask.recoverItems();
226                     LOG.debug("{}: retrying task queueing for {}", name, listener);
227                     continue;
228                 }
229
230                 // All notifications have either been delivered or we have timed out and warned about the ones we
231                 // have failed to deliver. In any case we are done here.
232                 break;
233             }
234         } catch (InterruptedException e) {
235             // We were interrupted trying to offer to the listener's queue. Somebody's probably
236             // telling us to quit.
237             LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
238         }
239
240         LOG.trace("{}: submitNotifications done for listener {}", name, listener);
241     }
242
243     /**
244      * Returns {@link ListenerNotificationQueueStats} instances for each current listener
245      * notification task in progress.
246      */
247     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
248         return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
249             t.size())).collect(Collectors.toList());
250     }
251
252     private void runTask(final L listener, final NotificationTask task) {
253         LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
254         executor.execute(task);
255     }
256
257     /**
258      * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
259      * Since we don't know anything about the listener class implementations and we're mixing
260      * multiple listener class instances in the same map, this avoids any potential issue with an
261      * equals implementation that just blindly casts the other Object to compare instead of checking
262      * for instanceof.
263      */
264     private static final class ListenerKey<L> {
265         private final @NonNull L listener;
266
267         ListenerKey(final L listener) {
268             this.listener = requireNonNull(listener);
269         }
270
271         @NonNull L getListener() {
272             return listener;
273         }
274
275         @Override
276         public int hashCode() {
277             return System.identityHashCode(listener);
278         }
279
280         @Override
281         public boolean equals(final Object obj) {
282             return obj == this || obj instanceof ListenerKey<?> && listener == ((ListenerKey<?>) obj).listener;
283         }
284
285         @Override
286         public String toString() {
287             return listener.toString();
288         }
289     }
290
291     /**
292      * Executor task for a single listener that queues notifications and sends them serially to the
293      * listener.
294      */
295     private class NotificationTask implements Runnable {
296         private final Lock lock = new ReentrantLock();
297         private final Condition notEmpty = lock.newCondition();
298         private final Condition notFull = lock.newCondition();
299         private final @NonNull ListenerKey<L> listenerKey;
300
301         @GuardedBy("lock")
302         private final Queue<N> queue = new ArrayDeque<>();
303         @GuardedBy("lock")
304         private boolean exiting;
305
306         NotificationTask(final @NonNull ListenerKey<L> listenerKey, final @NonNull Iterator<N> notifications) {
307             this.listenerKey = requireNonNull(listenerKey);
308             while (notifications.hasNext()) {
309                 queue.add(notifications.next());
310             }
311         }
312
313         @NonNull Iterator<N> recoverItems() {
314             // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
315             // get started, hence this is safe.
316             return queue.iterator();
317         }
318
319         int size() {
320             lock.lock();
321             try {
322                 return queue.size();
323             } finally {
324                 lock.unlock();
325             }
326         }
327
328         boolean submitNotifications(final @NonNull Iterator<N> notifications) throws InterruptedException {
329             final long start = System.nanoTime();
330             final long deadline = start + GIVE_UP_NANOS;
331
332             lock.lock();
333             try {
334                 // Lock may have blocked for some time, we need to take that into account. We may have exceedded
335                 // the deadline, but that is unlikely and even in that case we can make some progress without further
336                 // blocking.
337                 long canWait = deadline - System.nanoTime();
338
339                 while (true) {
340                     // Check the exiting flag - if true then #run is in the process of exiting so return
341                     // false to indicate such. Otherwise, offer the notifications to the queue.
342                     if (exiting) {
343                         return false;
344                     }
345
346                     final int avail = maxQueueCapacity - queue.size();
347                     if (avail <= 0) {
348                         if (canWait <= 0) {
349                             LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
350                                 + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
351                                 + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
352                                 listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
353                             return true;
354                         }
355
356                         canWait = notFull.awaitNanos(canWait);
357                         continue;
358                     }
359
360                     for (int i = 0; i < avail; ++i) {
361                         if (!notifications.hasNext()) {
362                             notEmpty.signal();
363                             return true;
364                         }
365
366                         queue.add(notifications.next());
367                     }
368                 }
369             } finally {
370                 lock.unlock();
371             }
372         }
373
374         @GuardedBy("lock")
375         private boolean waitForQueue() {
376             long timeout = TASK_WAIT_NANOS;
377
378             while (queue.isEmpty()) {
379                 if (timeout <= 0) {
380                     return false;
381                 }
382
383                 try {
384                     timeout = notEmpty.awaitNanos(timeout);
385                 } catch (InterruptedException e) {
386                     // The executor is probably shutting down so log as debug.
387                     LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
388                     return false;
389                 }
390             }
391
392             return true;
393         }
394
395         @Override
396         public void run() {
397             try {
398                 // Loop until we've dispatched all the notifications in the queue.
399                 while (true) {
400                     final @NonNull Collection<N> notifications;
401
402                     lock.lock();
403                     try {
404                         if (!waitForQueue()) {
405                             exiting = true;
406                             break;
407                         }
408
409                         // Splice the entire queue
410                         notifications = ImmutableList.copyOf(queue);
411                         queue.clear();
412
413                         notFull.signalAll();
414                     } finally {
415                         lock.unlock();
416                     }
417
418                     invokeListener(notifications);
419                 }
420             } finally {
421                 // We're exiting, gracefully or not - either way make sure we always remove
422                 // ourselves from the cache.
423                 listenerCache.remove(listenerKey, this);
424             }
425         }
426
427         @SuppressWarnings("checkstyle:illegalCatch")
428         private void invokeListener(final @NonNull Collection<N> notifications) {
429             LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
430             try {
431                 listenerInvoker.invokeListener(listenerKey.getListener(), notifications);
432             } catch (Exception e) {
433                 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
434                 LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
435             }
436         }
437     }
438 }