Do not require namespace repairing
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23
24 import javax.annotation.concurrent.GuardedBy;
25
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import com.google.common.base.Preconditions;
30
31 /**
32  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
33  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
34  * {@link Executor}.
35  * <p>
36  * This class optimizes its memory footprint by only allocating and maintaining a queue and executor
37  * task for a listener when there are pending notifications. On the first notification(s), a queue
38  * is created and a task is submitted to the executor to dispatch the queue to the associated
39  * listener. Any subsequent notifications that occur before all previous notifications have been
40  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
41  * queue and task are discarded.
42  *
43  * @author Thomas Pantelis
44  *
45  * @param <L> the listener type
46  * @param <N> the notification type
47  */
48 public class QueuedNotificationManager<L,N> implements NotificationManager<L,N> {
49
50     /**
51      * Interface implemented by clients that does the work of invoking listeners with notifications.
52      *
53      * @author Thomas Pantelis
54      *
55      * @param <L> the listener type
56      * @param <N> the notification type
57      */
58     public interface Invoker<L,N> {
59
60         /**
61          * Called to invoke a listener with a notification.
62          *
63          * @param listener the listener to invoke
64          * @param notification the notification to send
65          */
66         void invokeListener( L listener, N notification );
67     }
68
69     private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
70
71     private final Executor executor;
72     private final Invoker<L,N> listenerInvoker;
73
74     private final ConcurrentMap<ListenerKey<L>,NotificationTask>
75                                                           listenerCache = new ConcurrentHashMap<>();
76
77     private final String name;
78     private final int maxQueueCapacity;
79
80     /**
81      * Constructor.
82      *
83      * @param executor the {@link Executor} to use for notification tasks
84      * @param listenerInvoker the {@link Invoker} to use for invoking listeners
85      * @param maxQueueCapacity the capacity of each listener queue
86      * @param name the name of this instance for logging info
87      */
88     public QueuedNotificationManager( Executor executor, Invoker<L,N> listenerInvoker,
89             int maxQueueCapacity, String name ) {
90         this.executor = Preconditions.checkNotNull( executor );
91         this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker );
92         Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " );
93         this.maxQueueCapacity = maxQueueCapacity;
94         this.name = Preconditions.checkNotNull( name );
95     }
96
97     /* (non-Javadoc)
98      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
99      */
100     @Override
101     public void submitNotification( final L listener, final N notification )
102             throws RejectedExecutionException {
103
104         if( notification == null ) {
105             return;
106         }
107
108         submitNotifications( listener, Arrays.asList( notification ) );
109     }
110
111     /* (non-Javadoc)
112      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
113      */
114     @Override
115     public void submitNotifications( final L listener, final Iterable<N> notifications )
116             throws RejectedExecutionException {
117
118         if( notifications == null || listener == null ) {
119             return;
120         }
121
122         if( LOG.isTraceEnabled() ) {
123             LOG.trace( "{}: submitNotifications for listener {}: {}",
124                        name, listener.getClass(), notifications );
125         }
126
127         ListenerKey<L> key = new ListenerKey<>( listener );
128         NotificationTask newNotificationTask = null;
129
130         // Keep looping until we are either able to add a new NotificationTask or are able to
131         // add our notifications to an existing NotificationTask. Eventually one or the other
132         // will occur.
133
134         try {
135             while( true ) {
136                 NotificationTask existingTask = listenerCache.get( key );
137
138                 if( existingTask == null || !existingTask.submitNotifications( notifications ) ) {
139
140                     // Either there's no existing task or we couldn't add our notifications to the
141                     // existing one because it's in the process of exiting and removing itself from
142                     // the cache. Either way try to put a new task in the cache. If we can't put
143                     // then either the existing one is still there and hasn't removed itself quite
144                     // yet or some other concurrent thread beat us to the put although this method
145                     // shouldn't be called concurrently for the same listener as that would violate
146                     // notification ordering. In any case loop back up and try again.
147
148                     if( newNotificationTask == null ) {
149                         newNotificationTask = new NotificationTask( key, notifications );
150                     }
151
152                     existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
153                     if( existingTask == null ) {
154
155                         // We were able to put our new task - now submit it to the executor and
156                         // we're done. If it throws a RejectedxecutionException, let that propagate
157                         // to the caller.
158
159                         LOG.debug( "{}: Submitting NotificationTask for listener {}",
160                                    name, listener.getClass() );
161
162                         executor.execute( newNotificationTask );
163                         break;
164                     }
165                 } else {
166
167                     // We were able to add our notifications to an existing task so we're done.
168
169                     break;
170                 }
171             }
172         } catch( InterruptedException e ) {
173
174             // We were interrupted trying to offer to the listener's queue. Somebody's probably
175             // telling us to quit.
176
177             LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
178                        name, listener.getClass() );
179         }
180
181         if( LOG.isTraceEnabled() ) {
182             LOG.trace( "{}: submitNotifications dine for listener {}",
183                        name, listener.getClass() );
184         }
185     }
186
187     /**
188      * Returns {@link ListenerNotificationQueueStats} instances for each current listener
189      * notification task in progress.
190      */
191     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
192         List<ListenerNotificationQueueStats> statsList = new ArrayList<>( listenerCache.size() );
193         for( NotificationTask task: listenerCache.values() ) {
194             statsList.add( new ListenerNotificationQueueStats(
195                     task.listenerKey.getListener().getClass().getName(),
196                     task.notificationQueue.size() ) );
197         }
198
199         return statsList ;
200     }
201
202     /**
203      * Returns the maximum listener queue capacity.
204      */
205     public int getMaxQueueCapacity(){
206         return maxQueueCapacity;
207     }
208
209     /**
210      * Returns the {@link Executor} to used for notification tasks.
211      */
212     public Executor getExecutor(){
213         return executor;
214     }
215
216     /**
217      * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
218      * Since we don't know anything about the listener class implementations and we're mixing
219      * multiple listener class instances in the same map, this avoids any potential issue with an
220      * equals implementation that just blindly casts the other Object to compare instead of checking
221      * for instanceof.
222      */
223     private static class ListenerKey<L> {
224
225         private final L listener;
226
227         public ListenerKey( L listener ) {
228             this.listener = listener;
229         }
230
231         L getListener() {
232             return listener;
233         }
234
235         @Override
236         public int hashCode() {
237             return System.identityHashCode( listener );
238         }
239
240         @Override
241         public boolean equals( Object obj ) {
242             if (obj == this) {
243                 return true;
244             }
245             if (!(obj instanceof ListenerKey<?>)) {
246                 return false;
247             }
248
249             ListenerKey<?> other = (ListenerKey<?>) obj;
250             return listener == other.listener;
251         }
252     }
253
254     /**
255      * Executor task for a single listener that queues notifications and sends them serially to the
256      * listener.
257      */
258     private class NotificationTask implements Runnable {
259
260         private final BlockingQueue<N> notificationQueue;
261
262         private volatile boolean done = false;
263
264         @GuardedBy("queuingLock")
265         private boolean queuedNotifications = false;
266
267         private final Lock queuingLock = new ReentrantLock();
268
269         private final ListenerKey<L> listenerKey;
270
271         NotificationTask( ListenerKey<L> listenerKey, Iterable<N> notifications ) {
272
273             this.listenerKey = listenerKey;
274             this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
275
276             for( N notification: notifications ) {
277                 this.notificationQueue.add( notification );
278             }
279         }
280
281         boolean submitNotifications( Iterable<N> notifications ) throws InterruptedException {
282
283             queuingLock.lock();
284             try {
285
286                 // Check the done flag - if true then #run is in the process of exiting so return
287                 // false to indicate such. Otherwise, offer the notifications to the queue.
288
289                 if( done ) {
290                     return false;
291                 }
292
293                 for( N notification: notifications ) {
294
295                     while( true ) {
296
297                         // Try to offer for up to a minute and log a message if it times out.
298
299                         // FIXME: we loop forever to guarantee delivery however this leaves it open
300                         // for 1 rogue listener to bring everyone to a halt. Another option is to
301                         // limit the tries and give up after a while and drop the notification.
302                         // Given a reasonably large queue capacity and long timeout, if we still
303                         // can't queue then most likely the listener is an unrecoverable state
304                         // (deadlock or endless loop).
305
306                         if( LOG.isDebugEnabled() ) {
307                             LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
308                                        name, listenerKey.getListener().getClass(), notification );
309                         }
310
311                         if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) {
312                             break;
313                         }
314
315                         LOG.warn(
316                             "{}: Timed out trying to offer a notification to the queue for listener {}." +
317                             "The queue has reached its capacity of {}",
318                             name, listenerKey.getListener().getClass(), maxQueueCapacity );
319                     }
320                 }
321
322                 // Set the queuedNotifications flag to tell #run that we've just queued
323                 // notifications and not to exit yet, even if it thinks the queue is empty at this
324                 // point.
325
326                 queuedNotifications = true;
327
328             } finally {
329                 queuingLock.unlock();
330             }
331
332             return true;
333         }
334
335         @Override
336         public void run() {
337
338             try {
339                 // Loop until we've dispatched all the notifications in the queue.
340
341                 while( true ) {
342
343                     // Get the notification at the head of the queue, waiting a little bit for one
344                     // to get offered.
345
346                     N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
347                     if( notification == null ) {
348
349                         // The queue is empty - try to get the queuingLock. If we can't get the lock
350                         // then #submitNotifications is in the process of offering to the queue so
351                         // we'll loop back up and poll the queue again.
352
353                         if( queuingLock.tryLock() ) {
354                             try {
355
356                                 // Check the queuedNotifications flag to see if #submitNotifications
357                                 // has offered new notification(s) to the queue. If so, loop back up
358                                 // and poll the queue again. Otherwise set done to true and exit.
359                                 // Once we set the done flag and unlock, calls to
360                                 // #submitNotifications will fail and a new task will be created.
361
362                                 if( !queuedNotifications ) {
363                                     done = true;
364                                     break;
365                                 }
366
367                                 // Clear the queuedNotifications flag so we'll try to exit the next
368                                 // time through the loop when the queue is empty.
369
370                                 queuedNotifications = false;
371
372                             } finally {
373                                 queuingLock.unlock();
374                             }
375                         }
376                     }
377
378                     notifyListener( notification );
379                 }
380             } catch( InterruptedException e ) {
381
382                 // The executor is probably shutting down so log as debug.
383                 LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
384                            name, listenerKey.getListener().getClass() );
385             } finally {
386
387                 // We're exiting, gracefully or not - either way make sure we always remove
388                 // ourselves from the cache.
389
390                 listenerCache.remove( listenerKey );
391             }
392         }
393
394         private void notifyListener( N notification ) {
395
396             if( notification == null ) {
397                 return;
398             }
399
400             try {
401
402                 if( LOG.isDebugEnabled() ) {
403                     LOG.debug( "{}: Invoking listener {} with notification: {}",
404                                name, listenerKey.getListener().getClass(), notification );
405                 }
406
407                 listenerInvoker.invokeListener( listenerKey.getListener(), notification );
408
409             } catch( RuntimeException e ) {
410
411                 // We'll let a RuntimeException from the listener slide and keep sending any
412                 // remaining notifications.
413
414                 LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
415                            listenerKey.getListener().getClass() ), e );
416
417             } catch( Error e ) {
418
419                 // A JVM Error is severe - best practice is to throw them up the chain. Set done to
420                 // true so no new notifications can be added to this task as we're about to bail.
421
422                 done = true;
423                 throw e;
424             }
425         }
426     }
427 }