Added tests for yang.model.util
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import java.util.Arrays;
12 import java.util.concurrent.BlockingQueue;
13 import java.util.concurrent.ConcurrentHashMap;
14 import java.util.concurrent.ConcurrentMap;
15 import java.util.concurrent.Executor;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.RejectedExecutionException;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.locks.Lock;
20 import java.util.concurrent.locks.ReentrantLock;
21
22 import javax.annotation.concurrent.GuardedBy;
23
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import com.google.common.base.Preconditions;
28
29 /**
30  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
31  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
32  * {@link Executor}.
33  * <p>
34  * This class optimizes its memory footprint by only allocating and maintaining a queue and executor
35  * task for a listener when there are pending notifications. On the first notification(s), a queue
36  * is created and a task is submitted to the executor to dispatch the queue to the associated
37  * listener. Any subsequent notifications that occur before all previous notifications have been
38  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
39  * queue and task are discarded.
40  *
41  * @author Thomas Pantelis
42  *
43  * @param <L> the listener type
44  * @param <N> the notification type
45  */
46 public class QueuedNotificationManager<L,N> implements NotificationManager<L,N> {
47
48     /**
49      * Interface implemented by clients that does the work of invoking listeners with notifications.
50      *
51      * @author Thomas Pantelis
52      *
53      * @param <L> the listener type
54      * @param <N> the notification type
55      */
56     public interface Invoker<L,N> {
57
58         /**
59          * Called to invoke a listener with a notification.
60          *
61          * @param listener the listener to invoke
62          * @param notification the notification to send
63          */
64         void invokeListener( L listener, N notification );
65     }
66
67     private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
68
69     private final Executor executor;
70     private final Invoker<L,N> listenerInvoker;
71
72     private final ConcurrentMap<ListenerKey<L>,NotificationTask>
73                                                           listenerCache = new ConcurrentHashMap<>();
74
75     private final String name;
76     private final int maxQueueCapacity;
77
78     /**
79      * Constructor.
80      *
81      * @param executor the {@link Executor} to use for notification tasks
82      * @param listenerInvoker the {@link Invoker} to use for invoking listeners
83      * @param maxQueueCapacity the capacity of each listener queue
84      * @param name the name of this instance for logging info
85      */
86     public QueuedNotificationManager( Executor executor, Invoker<L,N> listenerInvoker,
87             int maxQueueCapacity, String name ) {
88         this.executor = Preconditions.checkNotNull( executor );
89         this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker );
90         Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " );
91         this.maxQueueCapacity = maxQueueCapacity;
92         this.name = Preconditions.checkNotNull( name );
93     }
94
95     /* (non-Javadoc)
96      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
97      */
98     @Override
99     public void submitNotification( final L listener, final N notification )
100             throws RejectedExecutionException {
101
102         if( notification == null ) {
103             return;
104         }
105
106         submitNotifications( listener, Arrays.asList( notification ) );
107     }
108
109     /* (non-Javadoc)
110      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
111      */
112     @Override
113     public void submitNotifications( final L listener, final Iterable<N> notifications )
114             throws RejectedExecutionException {
115
116         if( notifications == null || listener == null ) {
117             return;
118         }
119
120         if( LOG.isTraceEnabled() ) {
121             LOG.trace( "{}: submitNotifications for listener {}: {}",
122                        name, listener.getClass(), notifications );
123         }
124
125         ListenerKey<L> key = new ListenerKey<>( listener );
126         NotificationTask newNotificationTask = null;
127
128         // Keep looping until we are either able to add a new NotificationTask or are able to
129         // add our notifications to an existing NotificationTask. Eventually one or the other
130         // will occur.
131
132         try {
133             while( true ) {
134                 NotificationTask existingTask = listenerCache.get( key );
135
136                 if( existingTask == null || !existingTask.submitNotifications( notifications ) ) {
137
138                     // Either there's no existing task or we couldn't add our notifications to the
139                     // existing one because it's in the process of exiting and removing itself from
140                     // the cache. Either way try to put a new task in the cache. If we can't put
141                     // then either the existing one is still there and hasn't removed itself quite
142                     // yet or some other concurrent thread beat us to the put although this method
143                     // shouldn't be called concurrently for the same listener as that would violate
144                     // notification ordering. In any case loop back up and try again.
145
146                     if( newNotificationTask == null ) {
147                         newNotificationTask = new NotificationTask( key, notifications );
148                     }
149
150                     existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
151                     if( existingTask == null ) {
152
153                         // We were able to put our new task - now submit it to the executor and
154                         // we're done. If it throws a RejectedxecutionException, let that propagate
155                         // to the caller.
156
157                         LOG.debug( "{}: Submitting NotificationTask for listener {}",
158                                    name, listener.getClass() );
159
160                         executor.execute( newNotificationTask );
161                         break;
162                     }
163                 } else {
164
165                     // We were able to add our notifications to an existing task so we're done.
166
167                     break;
168                 }
169             }
170         } catch( InterruptedException e ) {
171
172             // We were interrupted trying to offer to the listener's queue. Somebody's probably
173             // telling us to quit.
174
175             LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
176                        name, listener.getClass() );
177         }
178
179         if( LOG.isTraceEnabled() ) {
180             LOG.trace( "{}: submitNotifications dine for listener {}",
181                        name, listener.getClass() );
182         }
183     }
184
185     /**
186      * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
187      * Since we don't know anything about the listener class implementations and we're mixing
188      * multiple listener class instances in the same map, this avoids any potential issue with an
189      * equals implementation that just blindly casts the other Object to compare instead of checking
190      * for instanceof.
191      */
192     private static class ListenerKey<L> {
193
194         private final L listener;
195
196         public ListenerKey( L listener ) {
197             this.listener = listener;
198         }
199
200         L getListener() {
201             return listener;
202         }
203
204         @Override
205         public int hashCode() {
206             return System.identityHashCode( listener );
207         }
208
209         @Override
210         public boolean equals( Object obj ) {
211             if (obj == this) {
212                 return true;
213             }
214             if (!(obj instanceof ListenerKey<?>)) {
215                 return false;
216             }
217
218             ListenerKey<?> other = (ListenerKey<?>) obj;
219             return listener == other.listener;
220         }
221     }
222
223     /**
224      * Executor task for a single listener that queues notifications and sends them serially to the
225      * listener.
226      */
227     private class NotificationTask implements Runnable {
228
229         private final BlockingQueue<N> notificationQueue;
230
231         private volatile boolean done = false;
232
233         @GuardedBy("queuingLock")
234         private boolean queuedNotifications = false;
235
236         private final Lock queuingLock = new ReentrantLock();
237
238         private final ListenerKey<L> listenerKey;
239
240         NotificationTask( ListenerKey<L> listenerKey, Iterable<N> notifications ) {
241
242             this.listenerKey = listenerKey;
243             this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
244
245             for( N notification: notifications ) {
246                 this.notificationQueue.add( notification );
247             }
248         }
249
250         boolean submitNotifications( Iterable<N> notifications ) throws InterruptedException {
251
252             queuingLock.lock();
253             try {
254
255                 // Check the done flag - if true then #run is in the process of exiting so return
256                 // false to indicate such. Otherwise, offer the notifications to the queue.
257
258                 if( done ) {
259                     return false;
260                 }
261
262                 for( N notification: notifications ) {
263
264                     while( true ) {
265
266                         // Try to offer for up to a minute and log a message if it times out.
267
268                         // FIXME: we loop forever to guarantee delivery however this leaves it open
269                         // for 1 rogue listener to bring everyone to a halt. Another option is to
270                         // limit the tries and give up after a while and drop the notification.
271                         // Given a reasonably large queue capacity and long timeout, if we still
272                         // can't queue then most likely the listener is an unrecoverable state
273                         // (deadlock or endless loop).
274
275                         if( LOG.isDebugEnabled() ) {
276                             LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
277                                        name, listenerKey.getListener().getClass(), notification );
278                         }
279
280                         if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) {
281                             break;
282                         }
283
284                         LOG.warn(
285                             "{}: Timed out trying to offer a notification to the queue for listener {}." +
286                             "The queue has reached its capacity of {}",
287                             name, listenerKey.getListener().getClass(), maxQueueCapacity );
288                     }
289                 }
290
291                 // Set the queuedNotifications flag to tell #run that we've just queued
292                 // notifications and not to exit yet, even if it thinks the queue is empty at this
293                 // point.
294
295                 queuedNotifications = true;
296
297             } finally {
298                 queuingLock.unlock();
299             }
300
301             return true;
302         }
303
304         @Override
305         public void run() {
306
307             try {
308                 // Loop until we've dispatched all the notifications in the queue.
309
310                 while( true ) {
311
312                     // Get the notification at the head of the queue, waiting a little bit for one
313                     // to get offered.
314
315                     N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
316                     if( notification == null ) {
317
318                         // The queue is empty - try to get the queuingLock. If we can't get the lock
319                         // then #submitNotifications is in the process of offering to the queue so
320                         // we'll loop back up and poll the queue again.
321
322                         if( queuingLock.tryLock() ) {
323                             try {
324
325                                 // Check the queuedNotifications flag to see if #submitNotifications
326                                 // has offered new notification(s) to the queue. If so, loop back up
327                                 // and poll the queue again. Otherwise set done to true and exit.
328                                 // Once we set the done flag and unlock, calls to
329                                 // #submitNotifications will fail and a new task will be created.
330
331                                 if( !queuedNotifications ) {
332                                     done = true;
333                                     break;
334                                 }
335
336                                 // Clear the queuedNotifications flag so we'll try to exit the next
337                                 // time through the loop when the queue is empty.
338
339                                 queuedNotifications = false;
340
341                             } finally {
342                                 queuingLock.unlock();
343                             }
344                         }
345                     }
346
347                     notifyListener( notification );
348                 }
349             } catch( InterruptedException e ) {
350
351                 // The executor is probably shutting down so log as debug.
352                 LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
353                            name, listenerKey.getListener().getClass() );
354             } finally {
355
356                 // We're exiting, gracefully or not - either way make sure we always remove
357                 // ourselves from the cache.
358
359                 listenerCache.remove( listenerKey );
360             }
361         }
362
363         private void notifyListener( N notification ) {
364
365             if( notification == null ) {
366                 return;
367             }
368
369             try {
370
371                 if( LOG.isDebugEnabled() ) {
372                     LOG.debug( "{}: Invoking listener {} with notification: {}",
373                                name, listenerKey.getListener().getClass(), notification );
374                 }
375
376                 listenerInvoker.invokeListener( listenerKey.getListener(), notification );
377
378             } catch( RuntimeException e ) {
379
380                 // We'll let a RuntimeException from the listener slide and keep sending any
381                 // remaining notifications.
382
383                 LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
384                            listenerKey.getListener().getClass() ), e );
385
386             } catch( Error e ) {
387
388                 // A JVM Error is severe - best practice is to throw them up the chain. Set done to
389                 // true so no new notifications can be added to this task as we're about to bail.
390
391                 done = true;
392                 throw e;
393             }
394         }
395     }
396 }