Cleanup QueuedNotificationManager
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import com.google.common.base.Preconditions;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23 import java.util.stream.Collectors;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
30  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
31  * {@link Executor}.
32  *
33  * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
34  * task for a listener when there are pending notifications. On the first notification(s), a queue
35  * is created and a task is submitted to the executor to dispatch the queue to the associated
36  * listener. Any subsequent notifications that occur before all previous notifications have been
37  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
38  * queue and task are discarded.
39  *
40  * @author Thomas Pantelis
41  *
42  * @param <L> the listener type
43  * @param <N> the notification type
44  */
45 public class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
46
47     /**
48      * Interface implemented by clients that does the work of invoking listeners with notifications.
49      *
50      * @author Thomas Pantelis
51      *
52      * @param <L> the listener type
53      * @param <N> the notification type
54      */
55     public interface Invoker<L, N> {
56
57         /**
58          * Called to invoke a listener with a notification.
59          *
60          * @param listener the listener to invoke
61          * @param notification the notification to send
62          */
63         void invokeListener(L listener, N notification);
64     }
65
66     private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
67
68     /**
69      * Caps the maximum number of attempts to offer notification to a particular listener.  Each
70      * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
71      */
72     private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
73
74     private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
75     private final Invoker<L, N> listenerInvoker;
76     private final Executor executor;
77     private final String name;
78     private final int maxQueueCapacity;
79
80     /**
81      * Constructor.
82      *
83      * @param executor the {@link Executor} to use for notification tasks
84      * @param listenerInvoker the {@link Invoker} to use for invoking listeners
85      * @param maxQueueCapacity the capacity of each listener queue
86      * @param name the name of this instance for logging info
87      */
88     public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
89             final int maxQueueCapacity, final String name) {
90         Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
91         this.executor = Preconditions.checkNotNull(executor);
92         this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker);
93         this.maxQueueCapacity = maxQueueCapacity;
94         this.name = Preconditions.checkNotNull(name);
95     }
96
97     /* (non-Javadoc)
98      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
99      */
100     @Override
101     public void submitNotification(final L listener, final N notification) throws RejectedExecutionException {
102         if (notification != null) {
103             submitNotifications(listener, Collections.singletonList(notification));
104         }
105     }
106
107     /* (non-Javadoc)
108      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
109      */
110     @Override
111     public void submitNotifications(final L listener, final Iterable<N> notifications)
112             throws RejectedExecutionException {
113
114         if (notifications == null || listener == null) {
115             return;
116         }
117
118         LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
119
120         final ListenerKey<L> key = new ListenerKey<>(listener);
121
122         // Keep looping until we are either able to add a new NotificationTask or are able to
123         // add our notifications to an existing NotificationTask. Eventually one or the other
124         // will occur.
125         try {
126             NotificationTask newNotificationTask = null;
127
128             while (true) {
129                 final NotificationTask existingTask = listenerCache.get(key);
130                 if (existingTask != null && existingTask.submitNotifications(notifications)) {
131                     // We were able to add our notifications to an existing task so we're done.
132                     break;
133                 }
134
135                 // Either there's no existing task or we couldn't add our notifications to the
136                 // existing one because it's in the process of exiting and removing itself from
137                 // the cache. Either way try to put a new task in the cache. If we can't put
138                 // then either the existing one is still there and hasn't removed itself quite
139                 // yet or some other concurrent thread beat us to the put although this method
140                 // shouldn't be called concurrently for the same listener as that would violate
141                 // notification ordering. In any case loop back up and try again.
142
143                 if (newNotificationTask == null) {
144                     newNotificationTask = new NotificationTask(key, notifications);
145                 }
146                 final NotificationTask oldTask = listenerCache.putIfAbsent(key, newNotificationTask);
147                 if (oldTask == null) {
148                     // We were able to put our new task - now submit it to the executor and
149                     // we're done. If it throws a RejectedxecutionException, let that propagate
150                     // to the caller.
151
152                     LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
153                     executor.execute(newNotificationTask);
154                     break;
155                 }
156
157                 LOG.debug("{}: retrying task queueing for {}", name, listener);
158             }
159         } catch (InterruptedException e) {
160             // We were interrupted trying to offer to the listener's queue. Somebody's probably
161             // telling us to quit.
162             LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
163         }
164
165         LOG.trace("{}: submitNotifications dine for listener {}", name, listener);
166     }
167
168     /**
169      * Returns {@link ListenerNotificationQueueStats} instances for each current listener
170      * notification task in progress.
171      */
172     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
173         return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
174             t.notificationQueue.size())).collect(Collectors.toList());
175     }
176
177     /**
178      * Returns the maximum listener queue capacity.
179      */
180     public int getMaxQueueCapacity() {
181         return maxQueueCapacity;
182     }
183
184     /**
185      * Returns the {@link Executor} to used for notification tasks.
186      */
187     public Executor getExecutor() {
188         return executor;
189     }
190
191     /**
192      * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
193      * Since we don't know anything about the listener class implementations and we're mixing
194      * multiple listener class instances in the same map, this avoids any potential issue with an
195      * equals implementation that just blindly casts the other Object to compare instead of checking
196      * for instanceof.
197      */
198     private static final class ListenerKey<L> {
199         private final L listener;
200
201         ListenerKey(final L listener) {
202             this.listener = Preconditions.checkNotNull(listener);
203         }
204
205         L getListener() {
206             return listener;
207         }
208
209         @Override
210         public int hashCode() {
211             return System.identityHashCode(listener);
212         }
213
214         @Override
215         public boolean equals(final Object obj) {
216             if (obj == this) {
217                 return true;
218             }
219             return (obj instanceof ListenerKey<?>) && listener == ((ListenerKey<?>) obj).listener;
220         }
221
222         @Override
223         public String toString() {
224             return listener.toString();
225         }
226     }
227
228     /**
229      * Executor task for a single listener that queues notifications and sends them serially to the
230      * listener.
231      */
232     private class NotificationTask implements Runnable {
233         private final Lock queuingLock = new ReentrantLock();
234         private final BlockingQueue<N> notificationQueue;
235         private final ListenerKey<L> listenerKey;
236
237         @GuardedBy("queuingLock")
238         private boolean queuedNotifications = false;
239         private volatile boolean done = false;
240
241         NotificationTask(final ListenerKey<L> listenerKey, final Iterable<N> notifications) {
242             this.listenerKey = Preconditions.checkNotNull(listenerKey);
243             this.notificationQueue = new LinkedBlockingQueue<>(maxQueueCapacity);
244
245             for (N notification: notifications) {
246                 this.notificationQueue.add(notification);
247             }
248         }
249
250         @GuardedBy("queuingLock")
251         private void publishNotification(final N notification) throws InterruptedException {
252             // The offer is attempted for up to 10 minutes, with a status message printed each minute
253             for (int notificationOfferAttempts = 0;
254                  notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
255
256                 // Try to offer for up to a minute and log a message if it times out.
257                 LOG.debug("{}: Offering notification to the queue for listener {}: {}", name, listenerKey,
258                     notification);
259
260                 if (notificationQueue.offer(notification, 1, TimeUnit.MINUTES)) {
261                     return;
262                 }
263
264                 LOG.warn("{}: Timed out trying to offer a notification to the queue for listener {} "
265                         + "on attempt {} of {}. The queue has reached its capacity of {}", name, listenerKey,
266                         notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity);
267             }
268
269             LOG.warn("{}: Failed to offer a notification to the queue for listener {}. Exceeded max allowable attempts"
270                     + " of {} in {} minutes; the listener is likely in an unrecoverable state (deadlock or endless"
271                     + " loop).", name, listenerKey, MAX_NOTIFICATION_OFFER_ATTEMPTS, MAX_NOTIFICATION_OFFER_ATTEMPTS);
272         }
273
274         boolean submitNotifications(final Iterable<N> notifications) throws InterruptedException {
275
276             queuingLock.lock();
277             try {
278
279                 // Check the done flag - if true then #run is in the process of exiting so return
280                 // false to indicate such. Otherwise, offer the notifications to the queue.
281
282                 if (done) {
283                     return false;
284                 }
285
286                 for (N notification : notifications) {
287                     publishNotification(notification);
288                 }
289
290                 // Set the queuedNotifications flag to tell #run that we've just queued
291                 // notifications and not to exit yet, even if it thinks the queue is empty at this
292                 // point.
293
294                 queuedNotifications = true;
295             } finally {
296                 queuingLock.unlock();
297             }
298
299             return true;
300         }
301
302         @Override
303         public void run() {
304             try {
305                 // Loop until we've dispatched all the notifications in the queue.
306
307                 while (true) {
308                     // Get the notification at the head of the queue, waiting a little bit for one
309                     // to get offered.
310
311                     final N notification = notificationQueue.poll(10, TimeUnit.MILLISECONDS);
312                     if (notification == null) {
313
314                         // The queue is empty - try to get the queuingLock. If we can't get the lock
315                         // then #submitNotifications is in the process of offering to the queue so
316                         // we'll loop back up and poll the queue again.
317
318                         if (queuingLock.tryLock()) {
319                             try {
320
321                                 // Check the queuedNotifications flag to see if #submitNotifications
322                                 // has offered new notification(s) to the queue. If so, loop back up
323                                 // and poll the queue again. Otherwise set done to true and exit.
324                                 // Once we set the done flag and unlock, calls to
325                                 // #submitNotifications will fail and a new task will be created.
326
327                                 if (!queuedNotifications) {
328                                     done = true;
329                                     break;
330                                 }
331
332                                 // Clear the queuedNotifications flag so we'll try to exit the next
333                                 // time through the loop when the queue is empty.
334
335                                 queuedNotifications = false;
336
337                             } finally {
338                                 queuingLock.unlock();
339                             }
340                         }
341                     }
342
343                     notifyListener(notification);
344                 }
345             } catch (InterruptedException e) {
346                 // The executor is probably shutting down so log as debug.
347                 LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
348             } finally {
349                 // We're exiting, gracefully or not - either way make sure we always remove
350                 // ourselves from the cache.
351                 listenerCache.remove(listenerKey, this);
352             }
353         }
354
355         private void notifyListener(final N notification) {
356             if (notification == null) {
357                 return;
358             }
359
360             LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notification);
361             try {
362                 listenerInvoker.invokeListener(listenerKey.getListener(), notification);
363             } catch (Exception e) {
364                 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
365                 LOG.error(String.format("%1$s: Error notifying listener %2$s", name, listenerKey), e);
366             }
367         }
368     }
369 }