f4846f8fce7280185f9f9c5ad91ca42c97b880c4
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23
24 import javax.annotation.concurrent.GuardedBy;
25
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import com.google.common.base.Preconditions;
30
31 /**
32  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
33  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
34  * {@link Executor}.
35  * <p>
36  * This class optimizes its memory footprint by only allocating and maintaining a queue and executor
37  * task for a listener when there are pending notifications. On the first notification(s), a queue
38  * is created and a task is submitted to the executor to dispatch the queue to the associated
39  * listener. Any subsequent notifications that occur before all previous notifications have been
40  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
41  * queue and task are discarded.
42  *
43  * @author Thomas Pantelis
44  *
45  * @param <L> the listener type
46  * @param <N> the notification type
47  */
48 public class QueuedNotificationManager<L,N> implements NotificationManager<L,N> {
49
50     /**
51      * Interface implemented by clients that does the work of invoking listeners with notifications.
52      *
53      * @author Thomas Pantelis
54      *
55      * @param <L> the listener type
56      * @param <N> the notification type
57      */
58     public interface Invoker<L,N> {
59
60         /**
61          * Called to invoke a listener with a notification.
62          *
63          * @param listener the listener to invoke
64          * @param notification the notification to send
65          */
66         void invokeListener( L listener, N notification );
67     }
68
69     private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
70
71     /**
72      * Caps the maximum number of attempts to offer notification to a particular listener.  Each
73      * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
74      */
75     private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
76
77     private final Executor executor;
78     private final Invoker<L,N> listenerInvoker;
79
80     private final ConcurrentMap<ListenerKey<L>,NotificationTask>
81                                                           listenerCache = new ConcurrentHashMap<>();
82
83     private final String name;
84     private final int maxQueueCapacity;
85
86     /**
87      * Constructor.
88      *
89      * @param executor the {@link Executor} to use for notification tasks
90      * @param listenerInvoker the {@link Invoker} to use for invoking listeners
91      * @param maxQueueCapacity the capacity of each listener queue
92      * @param name the name of this instance for logging info
93      */
94     public QueuedNotificationManager( Executor executor, Invoker<L,N> listenerInvoker,
95             int maxQueueCapacity, String name ) {
96         this.executor = Preconditions.checkNotNull( executor );
97         this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker );
98         Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " );
99         this.maxQueueCapacity = maxQueueCapacity;
100         this.name = Preconditions.checkNotNull( name );
101     }
102
103     /* (non-Javadoc)
104      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
105      */
106     @Override
107     public void submitNotification( final L listener, final N notification )
108             throws RejectedExecutionException {
109
110         if( notification == null ) {
111             return;
112         }
113
114         submitNotifications( listener, Collections.singletonList(notification));
115     }
116
117     /* (non-Javadoc)
118      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
119      */
120     @Override
121     public void submitNotifications( final L listener, final Iterable<N> notifications )
122             throws RejectedExecutionException {
123
124         if( notifications == null || listener == null ) {
125             return;
126         }
127
128         if( LOG.isTraceEnabled() ) {
129             LOG.trace( "{}: submitNotifications for listener {}: {}",
130                        name, listener.toString(), notifications );
131         }
132
133         ListenerKey<L> key = new ListenerKey<>( listener );
134         NotificationTask newNotificationTask = null;
135
136         // Keep looping until we are either able to add a new NotificationTask or are able to
137         // add our notifications to an existing NotificationTask. Eventually one or the other
138         // will occur.
139
140         try {
141             while( true ) {
142                 NotificationTask existingTask = listenerCache.get( key );
143
144                 if( existingTask == null || !existingTask.submitNotifications( notifications ) ) {
145
146                     // Either there's no existing task or we couldn't add our notifications to the
147                     // existing one because it's in the process of exiting and removing itself from
148                     // the cache. Either way try to put a new task in the cache. If we can't put
149                     // then either the existing one is still there and hasn't removed itself quite
150                     // yet or some other concurrent thread beat us to the put although this method
151                     // shouldn't be called concurrently for the same listener as that would violate
152                     // notification ordering. In any case loop back up and try again.
153
154                     if( newNotificationTask == null ) {
155                         newNotificationTask = new NotificationTask( key, notifications );
156                     }
157
158                     existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
159                     if( existingTask == null ) {
160
161                         // We were able to put our new task - now submit it to the executor and
162                         // we're done. If it throws a RejectedxecutionException, let that propagate
163                         // to the caller.
164
165                         LOG.debug( "{}: Submitting NotificationTask for listener {}",
166                                    name, listener.toString() );
167
168                         executor.execute( newNotificationTask );
169                         break;
170                     }
171                 } else {
172
173                     // We were able to add our notifications to an existing task so we're done.
174
175                     break;
176                 }
177             }
178         } catch( InterruptedException e ) {
179
180             // We were interrupted trying to offer to the listener's queue. Somebody's probably
181             // telling us to quit.
182
183             LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
184                        name, listener.toString() );
185         }
186
187         if( LOG.isTraceEnabled() ) {
188             LOG.trace( "{}: submitNotifications dine for listener {}",
189                        name, listener.toString() );
190         }
191     }
192
193     /**
194      * Returns {@link ListenerNotificationQueueStats} instances for each current listener
195      * notification task in progress.
196      */
197     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
198         List<ListenerNotificationQueueStats> statsList = new ArrayList<>( listenerCache.size() );
199         for( NotificationTask task: listenerCache.values() ) {
200             statsList.add( new ListenerNotificationQueueStats(
201                     task.listenerKey.toString(), task.notificationQueue.size() ) );
202         }
203
204         return statsList ;
205     }
206
207     /**
208      * Returns the maximum listener queue capacity.
209      */
210     public int getMaxQueueCapacity(){
211         return maxQueueCapacity;
212     }
213
214     /**
215      * Returns the {@link Executor} to used for notification tasks.
216      */
217     public Executor getExecutor(){
218         return executor;
219     }
220
221     /**
222      * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
223      * Since we don't know anything about the listener class implementations and we're mixing
224      * multiple listener class instances in the same map, this avoids any potential issue with an
225      * equals implementation that just blindly casts the other Object to compare instead of checking
226      * for instanceof.
227      */
228     private static class ListenerKey<L> {
229
230         private final L listener;
231
232         public ListenerKey( L listener ) {
233             this.listener = listener;
234         }
235
236         L getListener() {
237             return listener;
238         }
239
240         @Override
241         public int hashCode() {
242             return System.identityHashCode( listener );
243         }
244
245         @Override
246         public boolean equals( Object obj ) {
247             if (obj == this) {
248                 return true;
249             }
250             if (!(obj instanceof ListenerKey<?>)) {
251                 return false;
252             }
253
254             ListenerKey<?> other = (ListenerKey<?>) obj;
255             return listener == other.listener;
256         }
257
258         @Override
259         public String toString() {
260             return listener.toString();
261         }
262     }
263
264     /**
265      * Executor task for a single listener that queues notifications and sends them serially to the
266      * listener.
267      */
268     private class NotificationTask implements Runnable {
269
270         private final BlockingQueue<N> notificationQueue;
271
272         private volatile boolean done = false;
273
274         @GuardedBy("queuingLock")
275         private boolean queuedNotifications = false;
276
277         private final Lock queuingLock = new ReentrantLock();
278
279         private final ListenerKey<L> listenerKey;
280
281         NotificationTask( ListenerKey<L> listenerKey, Iterable<N> notifications ) {
282
283             this.listenerKey = listenerKey;
284             this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
285
286             for( N notification: notifications ) {
287                 this.notificationQueue.add( notification );
288             }
289         }
290
291         boolean submitNotifications( Iterable<N> notifications ) throws InterruptedException {
292
293             queuingLock.lock();
294             try {
295
296                 // Check the done flag - if true then #run is in the process of exiting so return
297                 // false to indicate such. Otherwise, offer the notifications to the queue.
298
299                 if( done ) {
300                     return false;
301                 }
302
303                 for( N notification: notifications ) {
304                     boolean notificationOfferAttemptSuccess = false;
305                     // The offer is attempted for up to 10 minutes, with a status message printed each minute
306                     for (int notificationOfferAttempts = 0;
307                          notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++ ) {
308
309                         // Try to offer for up to a minute and log a message if it times out.
310                         if( LOG.isDebugEnabled() ) {
311                             LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
312                                        name, listenerKey.toString(), notification );
313                         }
314
315                         if( notificationOfferAttemptSuccess = notificationQueue.offer(
316                                 notification, 1, TimeUnit.MINUTES ) ) {
317                             break;
318                         }
319
320                         LOG.warn(
321                             "{}: Timed out trying to offer a notification to the queue for listener {} " +
322                             "on attempt {} of {}. " +
323                             "The queue has reached its capacity of {}",
324                             name, listenerKey.toString(), notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS,
325                             maxQueueCapacity );
326                     }
327                     if (!notificationOfferAttemptSuccess) {
328                         LOG.warn(
329                             "{}: Failed to offer a notification to the queue for listener {}. " +
330                             "Exceeded max allowable attempts of {} in {} minutes; the listener " +
331                             "is likely in an unrecoverable state (deadlock or endless loop).",
332                             name, listenerKey.toString(), MAX_NOTIFICATION_OFFER_ATTEMPTS,
333                             MAX_NOTIFICATION_OFFER_ATTEMPTS );
334                     }
335                 }
336
337                 // Set the queuedNotifications flag to tell #run that we've just queued
338                 // notifications and not to exit yet, even if it thinks the queue is empty at this
339                 // point.
340
341                 queuedNotifications = true;
342
343             } finally {
344                 queuingLock.unlock();
345             }
346
347             return true;
348         }
349
350         @Override
351         public void run() {
352
353             try {
354                 // Loop until we've dispatched all the notifications in the queue.
355
356                 while( true ) {
357
358                     // Get the notification at the head of the queue, waiting a little bit for one
359                     // to get offered.
360
361                     N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
362                     if( notification == null ) {
363
364                         // The queue is empty - try to get the queuingLock. If we can't get the lock
365                         // then #submitNotifications is in the process of offering to the queue so
366                         // we'll loop back up and poll the queue again.
367
368                         if( queuingLock.tryLock() ) {
369                             try {
370
371                                 // Check the queuedNotifications flag to see if #submitNotifications
372                                 // has offered new notification(s) to the queue. If so, loop back up
373                                 // and poll the queue again. Otherwise set done to true and exit.
374                                 // Once we set the done flag and unlock, calls to
375                                 // #submitNotifications will fail and a new task will be created.
376
377                                 if( !queuedNotifications ) {
378                                     done = true;
379                                     break;
380                                 }
381
382                                 // Clear the queuedNotifications flag so we'll try to exit the next
383                                 // time through the loop when the queue is empty.
384
385                                 queuedNotifications = false;
386
387                             } finally {
388                                 queuingLock.unlock();
389                             }
390                         }
391                     }
392
393                     notifyListener( notification );
394                 }
395             } catch( InterruptedException e ) {
396
397                 // The executor is probably shutting down so log as debug.
398                 LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
399                            name, listenerKey.toString() );
400             } finally {
401
402                 // We're exiting, gracefully or not - either way make sure we always remove
403                 // ourselves from the cache.
404
405                 listenerCache.remove( listenerKey );
406             }
407         }
408
409         private void notifyListener( N notification ) {
410
411             if( notification == null ) {
412                 return;
413             }
414
415             try {
416
417                 if( LOG.isDebugEnabled() ) {
418                     LOG.debug( "{}: Invoking listener {} with notification: {}",
419                                name, listenerKey.toString(), notification );
420                 }
421
422                 listenerInvoker.invokeListener( listenerKey.getListener(), notification );
423
424             } catch( RuntimeException e ) {
425
426                 // We'll let a RuntimeException from the listener slide and keep sending any
427                 // remaining notifications.
428
429                 LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
430                            listenerKey.toString() ), e );
431
432             } catch( Error e ) {
433
434                 // A JVM Error is severe - best practice is to throw them up the chain. Set done to
435                 // true so no new notifications can be added to this task as we're about to bail.
436
437                 done = true;
438                 throw e;
439             }
440         }
441     }
442 }