90783f2cdc32d06be07e47c907a7504bd543db67
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.Executor;
19 import java.util.concurrent.LinkedBlockingQueue;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.locks.Lock;
23 import java.util.concurrent.locks.ReentrantLock;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
30  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
31  * {@link Executor}.
32  *
33  * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
34  * task for a listener when there are pending notifications. On the first notification(s), a queue
35  * is created and a task is submitted to the executor to dispatch the queue to the associated
36  * listener. Any subsequent notifications that occur before all previous notifications have been
37  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
38  * queue and task are discarded.
39  *
40  * @author Thomas Pantelis
41  *
42  * @param <L> the listener type
43  * @param <N> the notification type
44  */
45 public class QueuedNotificationManager<L,N> implements NotificationManager<L,N> {
46
47     /**
48      * Interface implemented by clients that does the work of invoking listeners with notifications.
49      *
50      * @author Thomas Pantelis
51      *
52      * @param <L> the listener type
53      * @param <N> the notification type
54      */
55     public interface Invoker<L,N> {
56
57         /**
58          * Called to invoke a listener with a notification.
59          *
60          * @param listener the listener to invoke
61          * @param notification the notification to send
62          */
63         void invokeListener( L listener, N notification );
64     }
65
66     private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
67
68     /**
69      * Caps the maximum number of attempts to offer notification to a particular listener.  Each
70      * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
71      */
72     private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
73
74     private final Executor executor;
75     private final Invoker<L,N> listenerInvoker;
76
77     private final ConcurrentMap<ListenerKey<L>,NotificationTask>
78                                                           listenerCache = new ConcurrentHashMap<>();
79
80     private final String name;
81     private final int maxQueueCapacity;
82
83     /**
84      * Constructor.
85      *
86      * @param executor the {@link Executor} to use for notification tasks
87      * @param listenerInvoker the {@link Invoker} to use for invoking listeners
88      * @param maxQueueCapacity the capacity of each listener queue
89      * @param name the name of this instance for logging info
90      */
91     public QueuedNotificationManager( Executor executor, Invoker<L,N> listenerInvoker,
92             int maxQueueCapacity, String name ) {
93         this.executor = Preconditions.checkNotNull( executor );
94         this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker );
95         Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " );
96         this.maxQueueCapacity = maxQueueCapacity;
97         this.name = Preconditions.checkNotNull( name );
98     }
99
100     /* (non-Javadoc)
101      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
102      */
103     @Override
104     public void submitNotification( final L listener, final N notification )
105             throws RejectedExecutionException {
106
107         if (notification == null) {
108             return;
109         }
110
111         submitNotifications( listener, Collections.singletonList(notification));
112     }
113
114     /* (non-Javadoc)
115      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
116      */
117     @Override
118     public void submitNotifications( final L listener, final Iterable<N> notifications )
119             throws RejectedExecutionException {
120
121         if (notifications == null || listener == null) {
122             return;
123         }
124
125         if (LOG.isTraceEnabled()) {
126             LOG.trace( "{}: submitNotifications for listener {}: {}",
127                        name, listener.toString(), notifications );
128         }
129
130         ListenerKey<L> key = new ListenerKey<>( listener );
131         NotificationTask newNotificationTask = null;
132
133         // Keep looping until we are either able to add a new NotificationTask or are able to
134         // add our notifications to an existing NotificationTask. Eventually one or the other
135         // will occur.
136
137         try {
138             while (true) {
139                 NotificationTask existingTask = listenerCache.get( key );
140
141                 if (existingTask == null || !existingTask.submitNotifications( notifications )) {
142
143                     // Either there's no existing task or we couldn't add our notifications to the
144                     // existing one because it's in the process of exiting and removing itself from
145                     // the cache. Either way try to put a new task in the cache. If we can't put
146                     // then either the existing one is still there and hasn't removed itself quite
147                     // yet or some other concurrent thread beat us to the put although this method
148                     // shouldn't be called concurrently for the same listener as that would violate
149                     // notification ordering. In any case loop back up and try again.
150
151                     if (newNotificationTask == null) {
152                         newNotificationTask = new NotificationTask( key, notifications );
153                     }
154
155                     existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
156                     if (existingTask == null) {
157
158                         // We were able to put our new task - now submit it to the executor and
159                         // we're done. If it throws a RejectedxecutionException, let that propagate
160                         // to the caller.
161
162                         LOG.debug( "{}: Submitting NotificationTask for listener {}",
163                                    name, listener.toString() );
164
165                         executor.execute( newNotificationTask );
166                         break;
167                     }
168                 } else {
169
170                     // We were able to add our notifications to an existing task so we're done.
171
172                     break;
173                 }
174             }
175         } catch (InterruptedException e) {
176
177             // We were interrupted trying to offer to the listener's queue. Somebody's probably
178             // telling us to quit.
179
180             LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
181                        name, listener.toString() );
182         }
183
184         if (LOG.isTraceEnabled()) {
185             LOG.trace( "{}: submitNotifications dine for listener {}",
186                        name, listener.toString() );
187         }
188     }
189
190     /**
191      * Returns {@link ListenerNotificationQueueStats} instances for each current listener
192      * notification task in progress.
193      */
194     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
195         List<ListenerNotificationQueueStats> statsList = new ArrayList<>( listenerCache.size() );
196         for (NotificationTask task: listenerCache.values()) {
197             statsList.add( new ListenerNotificationQueueStats(
198                     task.listenerKey.toString(), task.notificationQueue.size() ) );
199         }
200
201         return statsList ;
202     }
203
204     /**
205      * Returns the maximum listener queue capacity.
206      */
207     public int getMaxQueueCapacity() {
208         return maxQueueCapacity;
209     }
210
211     /**
212      * Returns the {@link Executor} to used for notification tasks.
213      */
214     public Executor getExecutor() {
215         return executor;
216     }
217
218     /**
219      * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
220      * Since we don't know anything about the listener class implementations and we're mixing
221      * multiple listener class instances in the same map, this avoids any potential issue with an
222      * equals implementation that just blindly casts the other Object to compare instead of checking
223      * for instanceof.
224      */
225     private static class ListenerKey<L> {
226
227         private final L listener;
228
229         ListenerKey( L listener ) {
230             this.listener = listener;
231         }
232
233         L getListener() {
234             return listener;
235         }
236
237         @Override
238         public int hashCode() {
239             return System.identityHashCode( listener );
240         }
241
242         @Override
243         public boolean equals( Object obj ) {
244             if (obj == this) {
245                 return true;
246             }
247             if (!(obj instanceof ListenerKey<?>)) {
248                 return false;
249             }
250
251             ListenerKey<?> other = (ListenerKey<?>) obj;
252             return listener == other.listener;
253         }
254
255         @Override
256         public String toString() {
257             return listener.toString();
258         }
259     }
260
261     /**
262      * Executor task for a single listener that queues notifications and sends them serially to the
263      * listener.
264      */
265     private class NotificationTask implements Runnable {
266
267         private final BlockingQueue<N> notificationQueue;
268
269         private volatile boolean done = false;
270
271         @GuardedBy("queuingLock")
272         private boolean queuedNotifications = false;
273
274         private final Lock queuingLock = new ReentrantLock();
275
276         private final ListenerKey<L> listenerKey;
277
278         NotificationTask( ListenerKey<L> listenerKey, Iterable<N> notifications ) {
279
280             this.listenerKey = listenerKey;
281             this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
282
283             for (N notification: notifications) {
284                 this.notificationQueue.add( notification );
285             }
286         }
287
288         boolean submitNotifications( Iterable<N> notifications ) throws InterruptedException {
289
290             queuingLock.lock();
291             try {
292
293                 // Check the done flag - if true then #run is in the process of exiting so return
294                 // false to indicate such. Otherwise, offer the notifications to the queue.
295
296                 if (done) {
297                     return false;
298                 }
299
300                 for (N notification: notifications) {
301                     boolean notificationOfferAttemptSuccess = false;
302                     // The offer is attempted for up to 10 minutes, with a status message printed each minute
303                     for (int notificationOfferAttempts = 0;
304                          notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
305
306                         // Try to offer for up to a minute and log a message if it times out.
307                         if (LOG.isDebugEnabled()) {
308                             LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
309                                        name, listenerKey.toString(), notification );
310                         }
311
312                         if (notificationOfferAttemptSuccess = notificationQueue.offer(
313                                 notification, 1, TimeUnit.MINUTES)) {
314                             break;
315                         }
316
317                         LOG.warn(
318                                 "{}: Timed out trying to offer a notification to the queue for listener {} "
319                                         + "on attempt {} of {}. " + "The queue has reached its capacity of {}",
320                                 name, listenerKey.toString(), notificationOfferAttempts,
321                                 MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity);
322                     }
323                     if (!notificationOfferAttemptSuccess) {
324                         LOG.warn(
325                                 "{}: Failed to offer a notification to the queue for listener {}. "
326                                         + "Exceeded max allowable attempts of {} in {} minutes; the listener "
327                                         + "is likely in an unrecoverable state (deadlock or endless loop).",
328                                 name, listenerKey.toString(), MAX_NOTIFICATION_OFFER_ATTEMPTS,
329                                 MAX_NOTIFICATION_OFFER_ATTEMPTS);
330                     }
331                 }
332
333                 // Set the queuedNotifications flag to tell #run that we've just queued
334                 // notifications and not to exit yet, even if it thinks the queue is empty at this
335                 // point.
336
337                 queuedNotifications = true;
338
339             } finally {
340                 queuingLock.unlock();
341             }
342
343             return true;
344         }
345
346         @Override
347         public void run() {
348
349             try {
350                 // Loop until we've dispatched all the notifications in the queue.
351
352                 while (true) {
353
354                     // Get the notification at the head of the queue, waiting a little bit for one
355                     // to get offered.
356
357                     N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
358                     if (notification == null) {
359
360                         // The queue is empty - try to get the queuingLock. If we can't get the lock
361                         // then #submitNotifications is in the process of offering to the queue so
362                         // we'll loop back up and poll the queue again.
363
364                         if (queuingLock.tryLock()) {
365                             try {
366
367                                 // Check the queuedNotifications flag to see if #submitNotifications
368                                 // has offered new notification(s) to the queue. If so, loop back up
369                                 // and poll the queue again. Otherwise set done to true and exit.
370                                 // Once we set the done flag and unlock, calls to
371                                 // #submitNotifications will fail and a new task will be created.
372
373                                 if (!queuedNotifications) {
374                                     done = true;
375                                     break;
376                                 }
377
378                                 // Clear the queuedNotifications flag so we'll try to exit the next
379                                 // time through the loop when the queue is empty.
380
381                                 queuedNotifications = false;
382
383                             } finally {
384                                 queuingLock.unlock();
385                             }
386                         }
387                     }
388
389                     notifyListener( notification );
390                 }
391             } catch (InterruptedException e) {
392
393                 // The executor is probably shutting down so log as debug.
394                 LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
395                            name, listenerKey.toString() );
396             } finally {
397
398                 // We're exiting, gracefully or not - either way make sure we always remove
399                 // ourselves from the cache.
400
401                 listenerCache.remove( listenerKey );
402             }
403         }
404
405         private void notifyListener( N notification ) {
406
407             if (notification == null) {
408                 return;
409             }
410
411             try {
412
413                 if (LOG.isDebugEnabled()) {
414                     LOG.debug( "{}: Invoking listener {} with notification: {}",
415                                name, listenerKey.toString(), notification );
416                 }
417
418                 listenerInvoker.invokeListener( listenerKey.getListener(), notification );
419
420             } catch (RuntimeException e ) {
421
422                 // We'll let a RuntimeException from the listener slide and keep sending any
423                 // remaining notifications.
424
425                 LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
426                            listenerKey.toString() ), e );
427
428             } catch (Error e) {
429
430                 // A JVM Error is severe - best practice is to throw them up the chain. Set done to
431                 // true so no new notifications can be added to this task as we're about to bail.
432
433                 done = true;
434                 throw e;
435             }
436         }
437     }
438 }