Expose QueuedNotificationManager statistics
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.collect.ImmutableList;
14 import java.util.ArrayDeque;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.List;
18 import java.util.Queue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.Lock;
25 import java.util.concurrent.locks.ReentrantLock;
26 import java.util.stream.Collectors;
27 import org.checkerframework.checker.lock.qual.GuardedBy;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
34  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
35  * {@link Executor}.
36  *
37  * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
38  * task for a listener when there are pending notifications. On the first notification(s), a queue
39  * is created and a task is submitted to the executor to dispatch the queue to the associated
40  * listener. Any subsequent notifications that occur before all previous notifications have been
41  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
42  * queue and task are discarded.
43  *
44  * @author Thomas Pantelis
45  *
46  * @param <L> the listener type
47  * @param <N> the notification type
48  */
49 public final class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
50     @FunctionalInterface
51     public interface BatchedInvoker<L, N> {
52         /**
53          * Called to invoke a listener with a notification.
54          *
55          * @param listener the listener to invoke
56          * @param notifications notifications to send
57          */
58         void invokeListener(@NonNull L listener, @NonNull ImmutableList<N> notifications);
59     }
60
61     private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
62
63     /**
64      * Caps the maximum number of attempts to offer notification to a particular listener.  Each
65      * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
66      */
67     private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
68     private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
69     private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
70
71     private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
72     private final @NonNull QueuedNotificationManagerMXBean mxBean = new QueuedNotificationManagerMXBeanImpl(this);
73     private final @NonNull BatchedInvoker<L, N> listenerInvoker;
74     private final @NonNull Executor executor;
75     private final @NonNull String name;
76     private final int maxQueueCapacity;
77
78     private QueuedNotificationManager(final @NonNull Executor executor,
79             final @NonNull BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
80             final @NonNull String name) {
81         checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
82         this.executor = requireNonNull(executor);
83         this.listenerInvoker = requireNonNull(listenerInvoker);
84         this.maxQueueCapacity = maxQueueCapacity;
85         this.name = requireNonNull(name);
86     }
87
88     /**
89      * Create a new notification manager.
90      *
91      * @param executor the {@link Executor} to use for notification tasks
92      * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners
93      * @param maxQueueCapacity the capacity of each listener queue
94      * @param name the name of this instance for logging info
95      */
96     public static <L, N> QueuedNotificationManager<L, N> create(final @NonNull Executor executor,
97             final@NonNull  BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
98             final @NonNull String name) {
99         return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
100     }
101
102     /**
103      * Returns the maximum listener queue capacity.
104      */
105     public int getMaxQueueCapacity() {
106         return maxQueueCapacity;
107     }
108
109     /**
110      * Return an {@link QueuedNotificationManagerMXBean} tied to this instance.
111      *
112      * @return An QueuedNotificationManagerMXBean object.
113      */
114     public @NonNull QueuedNotificationManagerMXBean getMXBean() {
115         return mxBean;
116     }
117
118     /**
119      * Returns the {@link Executor} to used for notification tasks.
120      */
121     public @NonNull Executor getExecutor() {
122         return executor;
123     }
124
125     /* (non-Javadoc)
126      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
127      */
128     @Override
129     public void submitNotification(final L listener, final N notification) {
130         if (notification != null) {
131             submitNotifications(listener, Collections.singletonList(notification));
132         }
133     }
134
135     /* (non-Javadoc)
136      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
137      */
138     @Override
139     public void submitNotifications(final L listener, final Iterable<N> notifications) {
140
141         if (notifications == null || listener == null) {
142             return;
143         }
144
145         LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
146
147         final ListenerKey<L> key = new ListenerKey<>(listener);
148
149         // Keep looping until we are either able to add a new NotificationTask or are able to
150         // add our notifications to an existing NotificationTask. Eventually one or the other
151         // will occur.
152         try {
153             Iterator<N> it = notifications.iterator();
154
155             while (true) {
156                 NotificationTask task = listenerCache.get(key);
157                 if (task == null) {
158                     // No task found, try to insert a new one
159                     final NotificationTask newTask = new NotificationTask(key, it);
160                     task = listenerCache.putIfAbsent(key, newTask);
161                     if (task == null) {
162                         // We were able to put our new task - now submit it to the executor and
163                         // we're done. If it throws a RejectedExecutionException, let that propagate
164                         // to the caller.
165                         runTask(listener, newTask);
166                         break;
167                     }
168
169                     // We have a racing task, hence we can continue, but we need to refresh our iterator from
170                     // the task.
171                     it = newTask.recoverItems();
172                 }
173
174                 final boolean completed = task.submitNotifications(it);
175                 if (!completed) {
176                     // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
177                     // than spinning on removal, we try to replace it.
178                     final NotificationTask newTask = new NotificationTask(key, it);
179                     if (listenerCache.replace(key, task, newTask)) {
180                         runTask(listener, newTask);
181                         break;
182                     }
183
184                     // We failed to replace the task, hence we need retry. Note we have to recover the items to be
185                     // published from the new task.
186                     it = newTask.recoverItems();
187                     LOG.debug("{}: retrying task queueing for {}", name, listener);
188                     continue;
189                 }
190
191                 // All notifications have either been delivered or we have timed out and warned about the ones we
192                 // have failed to deliver. In any case we are done here.
193                 break;
194             }
195         } catch (InterruptedException e) {
196             // We were interrupted trying to offer to the listener's queue. Somebody's probably
197             // telling us to quit.
198             LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
199         }
200
201         LOG.trace("{}: submitNotifications done for listener {}", name, listener);
202     }
203
204     /**
205      * Returns {@link ListenerNotificationQueueStats} instances for each current listener
206      * notification task in progress.
207      */
208     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
209         return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
210             t.size())).collect(Collectors.toList());
211     }
212
213     private void runTask(final L listener, final NotificationTask task) {
214         LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
215         executor.execute(task);
216     }
217
218     /**
219      * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
220      * Since we don't know anything about the listener class implementations and we're mixing
221      * multiple listener class instances in the same map, this avoids any potential issue with an
222      * equals implementation that just blindly casts the other Object to compare instead of checking
223      * for instanceof.
224      */
225     private static final class ListenerKey<L> {
226         private final @NonNull L listener;
227
228         ListenerKey(final L listener) {
229             this.listener = requireNonNull(listener);
230         }
231
232         @NonNull L getListener() {
233             return listener;
234         }
235
236         @Override
237         public int hashCode() {
238             return System.identityHashCode(listener);
239         }
240
241         @Override
242         public boolean equals(final Object obj) {
243             return obj == this || obj instanceof ListenerKey<?> && listener == ((ListenerKey<?>) obj).listener;
244         }
245
246         @Override
247         public String toString() {
248             return listener.toString();
249         }
250     }
251
252     /**
253      * Executor task for a single listener that queues notifications and sends them serially to the
254      * listener.
255      */
256     private class NotificationTask implements Runnable {
257         private final Lock lock = new ReentrantLock();
258         private final Condition notEmpty = lock.newCondition();
259         private final Condition notFull = lock.newCondition();
260         private final @NonNull ListenerKey<L> listenerKey;
261
262         @GuardedBy("lock")
263         private final Queue<N> queue = new ArrayDeque<>();
264         @GuardedBy("lock")
265         private boolean exiting;
266
267         NotificationTask(final @NonNull ListenerKey<L> listenerKey, final @NonNull Iterator<N> notifications) {
268             this.listenerKey = requireNonNull(listenerKey);
269             while (notifications.hasNext()) {
270                 queue.add(notifications.next());
271             }
272         }
273
274         @NonNull Iterator<N> recoverItems() {
275             // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
276             // get started, hence this is safe.
277             return queue.iterator();
278         }
279
280         int size() {
281             lock.lock();
282             try {
283                 return queue.size();
284             } finally {
285                 lock.unlock();
286             }
287         }
288
289         boolean submitNotifications(final @NonNull Iterator<N> notifications) throws InterruptedException {
290             final long start = System.nanoTime();
291             final long deadline = start + GIVE_UP_NANOS;
292
293             lock.lock();
294             try {
295                 // Lock may have blocked for some time, we need to take that into account. We may have exceedded
296                 // the deadline, but that is unlikely and even in that case we can make some progress without further
297                 // blocking.
298                 long canWait = deadline - System.nanoTime();
299
300                 while (true) {
301                     // Check the exiting flag - if true then #run is in the process of exiting so return
302                     // false to indicate such. Otherwise, offer the notifications to the queue.
303                     if (exiting) {
304                         return false;
305                     }
306
307                     final int avail = maxQueueCapacity - queue.size();
308                     if (avail <= 0) {
309                         if (canWait <= 0) {
310                             LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
311                                 + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
312                                 + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
313                                 listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
314                             return true;
315                         }
316
317                         canWait = notFull.awaitNanos(canWait);
318                         continue;
319                     }
320
321                     for (int i = 0; i < avail; ++i) {
322                         if (!notifications.hasNext()) {
323                             notEmpty.signal();
324                             return true;
325                         }
326
327                         queue.add(notifications.next());
328                     }
329                 }
330             } finally {
331                 lock.unlock();
332             }
333         }
334
335         @GuardedBy("lock")
336         private boolean waitForQueue() {
337             long timeout = TASK_WAIT_NANOS;
338
339             while (queue.isEmpty()) {
340                 if (timeout <= 0) {
341                     return false;
342                 }
343
344                 try {
345                     timeout = notEmpty.awaitNanos(timeout);
346                 } catch (InterruptedException e) {
347                     // The executor is probably shutting down so log as debug.
348                     LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
349                     return false;
350                 }
351             }
352
353             return true;
354         }
355
356         @Override
357         public void run() {
358             try {
359                 // Loop until we've dispatched all the notifications in the queue.
360                 while (true) {
361                     final @NonNull ImmutableList<N> notifications;
362
363                     lock.lock();
364                     try {
365                         if (!waitForQueue()) {
366                             exiting = true;
367                             break;
368                         }
369
370                         // Splice the entire queue
371                         notifications = ImmutableList.copyOf(queue);
372                         queue.clear();
373
374                         notFull.signalAll();
375                     } finally {
376                         lock.unlock();
377                     }
378
379                     invokeListener(notifications);
380                 }
381             } finally {
382                 // We're exiting, gracefully or not - either way make sure we always remove
383                 // ourselves from the cache.
384                 listenerCache.remove(listenerKey, this);
385             }
386         }
387
388         @SuppressWarnings("checkstyle:illegalCatch")
389         private void invokeListener(final @NonNull ImmutableList<N> notifications) {
390             LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
391             try {
392                 listenerInvoker.invokeListener(listenerKey.getListener(), notifications);
393             } catch (Exception e) {
394                 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
395                 LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
396             }
397         }
398     }
399 }