2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import java.util.Arrays;
12 import java.util.concurrent.BlockingQueue;
13 import java.util.concurrent.ConcurrentHashMap;
14 import java.util.concurrent.ConcurrentMap;
15 import java.util.concurrent.Executor;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.RejectedExecutionException;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.locks.Lock;
20 import java.util.concurrent.locks.ReentrantLock;
22 import javax.annotation.concurrent.GuardedBy;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
27 import com.google.common.base.Preconditions;
30 * This class manages queuing and dispatching notifications for multiple listeners concurrently.
31 * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
34 * This class optimizes its memory footprint by only allocating and maintaining a queue and executor
35 * task for a listener when there are pending notifications. On the first notification(s), a queue
36 * is created and a task is submitted to the executor to dispatch the queue to the associated
37 * listener. Any subsequent notifications that occur before all previous notifications have been
38 * dispatched are appended to the existing queue. When all notifications have been dispatched, the
39 * queue and task are discarded.
41 * @author Thomas Pantelis
43 * @param <L> the listener type
44 * @param <N> the notification type
46 public class QueuedNotificationManager<L,N> implements NotificationManager<L,N> {
49 * Interface implemented by clients that does the work of invoking listeners with notifications.
51 * @author Thomas Pantelis
53 * @param <L> the listener type
54 * @param <N> the notification type
56 public interface Invoker<L,N> {
59 * Called to invoke a listener with a notification.
61 * @param listener the listener to invoke
62 * @param notification the notification to send
64 void invokeListener( L listener, N notification );
67 private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
69 private final Executor executor;
70 private final Invoker<L,N> listenerInvoker;
72 private final ConcurrentMap<ListenerKey<L>,NotificationTask>
73 listenerCache = new ConcurrentHashMap<>();
75 private final String name;
76 private final int maxQueueCapacity;
81 * @param executor the {@link Executor} to use for notification tasks
82 * @param listenerInvoker the {@link Invoker} to use for invoking listeners
83 * @param maxQueueCapacity the capacity of each listener queue
84 * @param name the name of this instance for logging info
86 public QueuedNotificationManager( Executor executor, Invoker<L,N> listenerInvoker,
87 int maxQueueCapacity, String name ) {
88 this.executor = Preconditions.checkNotNull( executor );
89 this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker );
90 Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " );
91 this.maxQueueCapacity = maxQueueCapacity;
92 this.name = Preconditions.checkNotNull( name );
96 * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
99 public void submitNotification( final L listener, final N notification )
100 throws RejectedExecutionException {
102 if( notification == null ) {
106 submitNotifications( listener, Arrays.asList( notification ) );
110 * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
113 public void submitNotifications( final L listener, final Iterable<N> notifications )
114 throws RejectedExecutionException {
116 if( notifications == null || listener == null ) {
120 if( LOG.isTraceEnabled() ) {
121 LOG.trace( "{}: submitNotifications for listener {}: {}",
122 name, listener.getClass(), notifications );
125 ListenerKey<L> key = new ListenerKey<>( listener );
126 NotificationTask newNotificationTask = null;
128 // Keep looping until we are either able to add a new NotificationTask or are able to
129 // add our notifications to an existing NotificationTask. Eventually one or the other
134 NotificationTask existingTask = listenerCache.get( key );
136 if( existingTask == null || !existingTask.submitNotifications( notifications ) ) {
138 // Either there's no existing task or we couldn't add our notifications to the
139 // existing one because it's in the process of exiting and removing itself from
140 // the cache. Either way try to put a new task in the cache. If we can't put
141 // then either the existing one is still there and hasn't removed itself quite
142 // yet or some other concurrent thread beat us to the put although this method
143 // shouldn't be called concurrently for the same listener as that would violate
144 // notification ordering. In any case loop back up and try again.
146 if( newNotificationTask == null ) {
147 newNotificationTask = new NotificationTask( key, notifications );
150 existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
151 if( existingTask == null ) {
153 // We were able to put our new task - now submit it to the executor and
154 // we're done. If it throws a RejectedxecutionException, let that propagate
157 LOG.debug( "{}: Submitting NotificationTask for listener {}",
158 name, listener.getClass() );
160 executor.execute( newNotificationTask );
165 // We were able to add our notifications to an existing task so we're done.
170 } catch( InterruptedException e ) {
172 // We were interrupted trying to offer to the listener's queue. Somebody's probably
173 // telling us to quit.
175 LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
176 name, listener.getClass() );
179 if( LOG.isTraceEnabled() ) {
180 LOG.trace( "{}: submitNotifications dine for listener {}",
181 name, listener.getClass() );
186 * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
187 * Since we don't know anything about the listener class implementations and we're mixing
188 * multiple listener class instances in the same map, this avoids any potential issue with an
189 * equals implementation that just blindly casts the other Object to compare instead of checking
192 private static class ListenerKey<L> {
194 private final L listener;
196 public ListenerKey( L listener ) {
197 this.listener = listener;
205 public int hashCode() {
206 return System.identityHashCode( listener );
210 public boolean equals( Object obj ) {
214 if (!(obj instanceof ListenerKey<?>)) {
218 ListenerKey<?> other = (ListenerKey<?>) obj;
219 return listener == other.listener;
224 * Executor task for a single listener that queues notifications and sends them serially to the
227 private class NotificationTask implements Runnable {
229 private final BlockingQueue<N> notificationQueue;
231 private volatile boolean done = false;
233 @GuardedBy("queuingLock")
234 private boolean queuedNotifications = false;
236 private final Lock queuingLock = new ReentrantLock();
238 private final ListenerKey<L> listenerKey;
240 NotificationTask( ListenerKey<L> listenerKey, Iterable<N> notifications ) {
242 this.listenerKey = listenerKey;
243 this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
245 for( N notification: notifications ) {
246 this.notificationQueue.add( notification );
250 boolean submitNotifications( Iterable<N> notifications ) throws InterruptedException {
255 // Check the done flag - if true then #run is in the process of exiting so return
256 // false to indicate such. Otherwise, offer the notifications to the queue.
262 for( N notification: notifications ) {
266 // Try to offer for up to a minute and log a message if it times out.
268 // FIXME: we loop forever to guarantee delivery however this leaves it open
269 // for 1 rogue listener to bring everyone to a halt. Another option is to
270 // limit the tries and give up after a while and drop the notification.
271 // Given a reasonably large queue capacity and long timeout, if we still
272 // can't queue then most likely the listener is an unrecoverable state
273 // (deadlock or endless loop).
275 if( LOG.isDebugEnabled() ) {
276 LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
277 name, listenerKey.getListener().getClass(), notification );
280 if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) {
285 "{}: Timed out trying to offer a notification to the queue for listener {}." +
286 "The queue has reached its capacity of {}",
287 name, listenerKey.getListener().getClass(), maxQueueCapacity );
291 // Set the queuedNotifications flag to tell #run that we've just queued
292 // notifications and not to exit yet, even if it thinks the queue is empty at this
295 queuedNotifications = true;
298 queuingLock.unlock();
308 // Loop until we've dispatched all the notifications in the queue.
312 // Get the notification at the head of the queue, waiting a little bit for one
315 N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
316 if( notification == null ) {
318 // The queue is empty - try to get the queuingLock. If we can't get the lock
319 // then #submitNotifications is in the process of offering to the queue so
320 // we'll loop back up and poll the queue again.
322 if( queuingLock.tryLock() ) {
325 // Check the queuedNotifications flag to see if #submitNotifications
326 // has offered new notification(s) to the queue. If so, loop back up
327 // and poll the queue again. Otherwise set done to true and exit.
328 // Once we set the done flag and unlock, calls to
329 // #submitNotifications will fail and a new task will be created.
331 if( !queuedNotifications ) {
336 // Clear the queuedNotifications flag so we'll try to exit the next
337 // time through the loop when the queue is empty.
339 queuedNotifications = false;
342 queuingLock.unlock();
347 notifyListener( notification );
349 } catch( InterruptedException e ) {
351 // The executor is probably shutting down so log as debug.
352 LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
353 name, listenerKey.getListener().getClass() );
356 // We're exiting, gracefully or not - either way make sure we always remove
357 // ourselves from the cache.
359 listenerCache.remove( listenerKey );
363 private void notifyListener( N notification ) {
365 if( notification == null ) {
371 if( LOG.isDebugEnabled() ) {
372 LOG.debug( "{}: Invoking listener {} with notification: {}",
373 name, listenerKey.getListener().getClass(), notification );
376 listenerInvoker.invokeListener( listenerKey.getListener(), notification );
378 } catch( RuntimeException e ) {
380 // We'll let a RuntimeException from the listener slide and keep sending any
381 // remaining notifications.
383 LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
384 listenerKey.getListener().getClass() ), e );
388 // A JVM Error is severe - best practice is to throw them up the chain. Set done to
389 // true so no new notifications can be added to this task as we're about to bail.