17ab583782de7db24706a6d07a9338aacf79372e
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23
24 import javax.annotation.concurrent.GuardedBy;
25
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import com.google.common.base.Preconditions;
30
31 /**
32  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
33  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
34  * {@link Executor}.
35  * <p>
36  * This class optimizes its memory footprint by only allocating and maintaining a queue and executor
37  * task for a listener when there are pending notifications. On the first notification(s), a queue
38  * is created and a task is submitted to the executor to dispatch the queue to the associated
39  * listener. Any subsequent notifications that occur before all previous notifications have been
40  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
41  * queue and task are discarded.
42  *
43  * @author Thomas Pantelis
44  *
45  * @param <L> the listener type
46  * @param <N> the notification type
47  */
48 public class QueuedNotificationManager<L,N> implements NotificationManager<L,N> {
49
50     /**
51      * Interface implemented by clients that does the work of invoking listeners with notifications.
52      *
53      * @author Thomas Pantelis
54      *
55      * @param <L> the listener type
56      * @param <N> the notification type
57      */
58     public interface Invoker<L,N> {
59
60         /**
61          * Called to invoke a listener with a notification.
62          *
63          * @param listener the listener to invoke
64          * @param notification the notification to send
65          */
66         void invokeListener( L listener, N notification );
67     }
68
69     private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
70
71     private final Executor executor;
72     private final Invoker<L,N> listenerInvoker;
73
74     private final ConcurrentMap<ListenerKey<L>,NotificationTask>
75                                                           listenerCache = new ConcurrentHashMap<>();
76
77     private final String name;
78     private final int maxQueueCapacity;
79
80     /**
81      * Constructor.
82      *
83      * @param executor the {@link Executor} to use for notification tasks
84      * @param listenerInvoker the {@link Invoker} to use for invoking listeners
85      * @param maxQueueCapacity the capacity of each listener queue
86      * @param name the name of this instance for logging info
87      */
88     public QueuedNotificationManager( Executor executor, Invoker<L,N> listenerInvoker,
89             int maxQueueCapacity, String name ) {
90         this.executor = Preconditions.checkNotNull( executor );
91         this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker );
92         Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " );
93         this.maxQueueCapacity = maxQueueCapacity;
94         this.name = Preconditions.checkNotNull( name );
95     }
96
97     /* (non-Javadoc)
98      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
99      */
100     @Override
101     public void submitNotification( final L listener, final N notification )
102             throws RejectedExecutionException {
103
104         if( notification == null ) {
105             return;
106         }
107
108         submitNotifications( listener, Collections.singletonList(notification));
109     }
110
111     /* (non-Javadoc)
112      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
113      */
114     @Override
115     public void submitNotifications( final L listener, final Iterable<N> notifications )
116             throws RejectedExecutionException {
117
118         if( notifications == null || listener == null ) {
119             return;
120         }
121
122         if( LOG.isTraceEnabled() ) {
123             LOG.trace( "{}: submitNotifications for listener {}: {}",
124                        name, listener.toString(), notifications );
125         }
126
127         ListenerKey<L> key = new ListenerKey<>( listener );
128         NotificationTask newNotificationTask = null;
129
130         // Keep looping until we are either able to add a new NotificationTask or are able to
131         // add our notifications to an existing NotificationTask. Eventually one or the other
132         // will occur.
133
134         try {
135             while( true ) {
136                 NotificationTask existingTask = listenerCache.get( key );
137
138                 if( existingTask == null || !existingTask.submitNotifications( notifications ) ) {
139
140                     // Either there's no existing task or we couldn't add our notifications to the
141                     // existing one because it's in the process of exiting and removing itself from
142                     // the cache. Either way try to put a new task in the cache. If we can't put
143                     // then either the existing one is still there and hasn't removed itself quite
144                     // yet or some other concurrent thread beat us to the put although this method
145                     // shouldn't be called concurrently for the same listener as that would violate
146                     // notification ordering. In any case loop back up and try again.
147
148                     if( newNotificationTask == null ) {
149                         newNotificationTask = new NotificationTask( key, notifications );
150                     }
151
152                     existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
153                     if( existingTask == null ) {
154
155                         // We were able to put our new task - now submit it to the executor and
156                         // we're done. If it throws a RejectedxecutionException, let that propagate
157                         // to the caller.
158
159                         LOG.debug( "{}: Submitting NotificationTask for listener {}",
160                                    name, listener.toString() );
161
162                         executor.execute( newNotificationTask );
163                         break;
164                     }
165                 } else {
166
167                     // We were able to add our notifications to an existing task so we're done.
168
169                     break;
170                 }
171             }
172         } catch( InterruptedException e ) {
173
174             // We were interrupted trying to offer to the listener's queue. Somebody's probably
175             // telling us to quit.
176
177             LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
178                        name, listener.toString() );
179         }
180
181         if( LOG.isTraceEnabled() ) {
182             LOG.trace( "{}: submitNotifications dine for listener {}",
183                        name, listener.toString() );
184         }
185     }
186
187     /**
188      * Returns {@link ListenerNotificationQueueStats} instances for each current listener
189      * notification task in progress.
190      */
191     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
192         List<ListenerNotificationQueueStats> statsList = new ArrayList<>( listenerCache.size() );
193         for( NotificationTask task: listenerCache.values() ) {
194             statsList.add( new ListenerNotificationQueueStats(
195                     task.listenerKey.toString(), task.notificationQueue.size() ) );
196         }
197
198         return statsList ;
199     }
200
201     /**
202      * Returns the maximum listener queue capacity.
203      */
204     public int getMaxQueueCapacity(){
205         return maxQueueCapacity;
206     }
207
208     /**
209      * Returns the {@link Executor} to used for notification tasks.
210      */
211     public Executor getExecutor(){
212         return executor;
213     }
214
215     /**
216      * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
217      * Since we don't know anything about the listener class implementations and we're mixing
218      * multiple listener class instances in the same map, this avoids any potential issue with an
219      * equals implementation that just blindly casts the other Object to compare instead of checking
220      * for instanceof.
221      */
222     private static class ListenerKey<L> {
223
224         private final L listener;
225
226         public ListenerKey( L listener ) {
227             this.listener = listener;
228         }
229
230         L getListener() {
231             return listener;
232         }
233
234         @Override
235         public int hashCode() {
236             return System.identityHashCode( listener );
237         }
238
239         @Override
240         public boolean equals( Object obj ) {
241             if (obj == this) {
242                 return true;
243             }
244             if (!(obj instanceof ListenerKey<?>)) {
245                 return false;
246             }
247
248             ListenerKey<?> other = (ListenerKey<?>) obj;
249             return listener == other.listener;
250         }
251
252         @Override
253         public String toString() {
254             return listener.toString();
255         }
256     }
257
258     /**
259      * Executor task for a single listener that queues notifications and sends them serially to the
260      * listener.
261      */
262     private class NotificationTask implements Runnable {
263
264         private final BlockingQueue<N> notificationQueue;
265
266         private volatile boolean done = false;
267
268         @GuardedBy("queuingLock")
269         private boolean queuedNotifications = false;
270
271         private final Lock queuingLock = new ReentrantLock();
272
273         private final ListenerKey<L> listenerKey;
274
275         NotificationTask( ListenerKey<L> listenerKey, Iterable<N> notifications ) {
276
277             this.listenerKey = listenerKey;
278             this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
279
280             for( N notification: notifications ) {
281                 this.notificationQueue.add( notification );
282             }
283         }
284
285         boolean submitNotifications( Iterable<N> notifications ) throws InterruptedException {
286
287             queuingLock.lock();
288             try {
289
290                 // Check the done flag - if true then #run is in the process of exiting so return
291                 // false to indicate such. Otherwise, offer the notifications to the queue.
292
293                 if( done ) {
294                     return false;
295                 }
296
297                 for( N notification: notifications ) {
298
299                     while( true ) {
300
301                         // Try to offer for up to a minute and log a message if it times out.
302
303                         // FIXME: we loop forever to guarantee delivery however this leaves it open
304                         // for 1 rogue listener to bring everyone to a halt. Another option is to
305                         // limit the tries and give up after a while and drop the notification.
306                         // Given a reasonably large queue capacity and long timeout, if we still
307                         // can't queue then most likely the listener is an unrecoverable state
308                         // (deadlock or endless loop).
309
310                         if( LOG.isDebugEnabled() ) {
311                             LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
312                                        name, listenerKey.toString(), notification );
313                         }
314
315                         if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) {
316                             break;
317                         }
318
319                         LOG.warn(
320                             "{}: Timed out trying to offer a notification to the queue for listener {}." +
321                             "The queue has reached its capacity of {}",
322                             name, listenerKey.toString(), maxQueueCapacity );
323                     }
324                 }
325
326                 // Set the queuedNotifications flag to tell #run that we've just queued
327                 // notifications and not to exit yet, even if it thinks the queue is empty at this
328                 // point.
329
330                 queuedNotifications = true;
331
332             } finally {
333                 queuingLock.unlock();
334             }
335
336             return true;
337         }
338
339         @Override
340         public void run() {
341
342             try {
343                 // Loop until we've dispatched all the notifications in the queue.
344
345                 while( true ) {
346
347                     // Get the notification at the head of the queue, waiting a little bit for one
348                     // to get offered.
349
350                     N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
351                     if( notification == null ) {
352
353                         // The queue is empty - try to get the queuingLock. If we can't get the lock
354                         // then #submitNotifications is in the process of offering to the queue so
355                         // we'll loop back up and poll the queue again.
356
357                         if( queuingLock.tryLock() ) {
358                             try {
359
360                                 // Check the queuedNotifications flag to see if #submitNotifications
361                                 // has offered new notification(s) to the queue. If so, loop back up
362                                 // and poll the queue again. Otherwise set done to true and exit.
363                                 // Once we set the done flag and unlock, calls to
364                                 // #submitNotifications will fail and a new task will be created.
365
366                                 if( !queuedNotifications ) {
367                                     done = true;
368                                     break;
369                                 }
370
371                                 // Clear the queuedNotifications flag so we'll try to exit the next
372                                 // time through the loop when the queue is empty.
373
374                                 queuedNotifications = false;
375
376                             } finally {
377                                 queuingLock.unlock();
378                             }
379                         }
380                     }
381
382                     notifyListener( notification );
383                 }
384             } catch( InterruptedException e ) {
385
386                 // The executor is probably shutting down so log as debug.
387                 LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
388                            name, listenerKey.toString() );
389             } finally {
390
391                 // We're exiting, gracefully or not - either way make sure we always remove
392                 // ourselves from the cache.
393
394                 listenerCache.remove( listenerKey );
395             }
396         }
397
398         private void notifyListener( N notification ) {
399
400             if( notification == null ) {
401                 return;
402             }
403
404             try {
405
406                 if( LOG.isDebugEnabled() ) {
407                     LOG.debug( "{}: Invoking listener {} with notification: {}",
408                                name, listenerKey.toString(), notification );
409                 }
410
411                 listenerInvoker.invokeListener( listenerKey.getListener(), notification );
412
413             } catch( RuntimeException e ) {
414
415                 // We'll let a RuntimeException from the listener slide and keep sending any
416                 // remaining notifications.
417
418                 LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
419                            listenerKey.toString() ), e );
420
421             } catch( Error e ) {
422
423                 // A JVM Error is severe - best practice is to throw them up the chain. Set done to
424                 // true so no new notifications can be added to this task as we're about to bail.
425
426                 done = true;
427                 throw e;
428             }
429         }
430     }
431 }