/* * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.yangtools.util.concurrent; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.annotation.concurrent.GuardedBy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; /** * This class manages queuing and dispatching notifications for multiple listeners concurrently. * Notifications are queued on a per-listener basis and dispatched serially to each listener via an * {@link Executor}. *

* This class optimizes its memory footprint by only allocating and maintaining a queue and executor * task for a listener when there are pending notifications. On the first notification(s), a queue * is created and a task is submitted to the executor to dispatch the queue to the associated * listener. Any subsequent notifications that occur before all previous notifications have been * dispatched are appended to the existing queue. When all notifications have been dispatched, the * queue and task are discarded. * * @author Thomas Pantelis * * @param the listener type * @param the notification type */ public class QueuedNotificationManager implements NotificationManager { /** * Interface implemented by clients that does the work of invoking listeners with notifications. * * @author Thomas Pantelis * * @param the listener type * @param the notification type */ public interface Invoker { /** * Called to invoke a listener with a notification. * * @param listener the listener to invoke * @param notification the notification to send */ void invokeListener( L listener, N notification ); } private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class ); private final Executor executor; private final Invoker listenerInvoker; private final ConcurrentMap,NotificationTask> listenerCache = new ConcurrentHashMap<>(); private final String name; private final int maxQueueCapacity; /** * Constructor. * * @param executor the {@link Executor} to use for notification tasks * @param listenerInvoker the {@link Invoker} to use for invoking listeners * @param maxQueueCapacity the capacity of each listener queue * @param name the name of this instance for logging info */ public QueuedNotificationManager( Executor executor, Invoker listenerInvoker, int maxQueueCapacity, String name ) { this.executor = Preconditions.checkNotNull( executor ); this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker ); Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " ); this.maxQueueCapacity = maxQueueCapacity; this.name = Preconditions.checkNotNull( name ); } /* (non-Javadoc) * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N) */ @Override public void submitNotification( final L listener, final N notification ) throws RejectedExecutionException { if( notification == null ) { return; } submitNotifications( listener, Collections.singletonList(notification)); } /* (non-Javadoc) * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection) */ @Override public void submitNotifications( final L listener, final Iterable notifications ) throws RejectedExecutionException { if( notifications == null || listener == null ) { return; } if( LOG.isTraceEnabled() ) { LOG.trace( "{}: submitNotifications for listener {}: {}", name, listener.toString(), notifications ); } ListenerKey key = new ListenerKey<>( listener ); NotificationTask newNotificationTask = null; // Keep looping until we are either able to add a new NotificationTask or are able to // add our notifications to an existing NotificationTask. Eventually one or the other // will occur. try { while( true ) { NotificationTask existingTask = listenerCache.get( key ); if( existingTask == null || !existingTask.submitNotifications( notifications ) ) { // Either there's no existing task or we couldn't add our notifications to the // existing one because it's in the process of exiting and removing itself from // the cache. Either way try to put a new task in the cache. If we can't put // then either the existing one is still there and hasn't removed itself quite // yet or some other concurrent thread beat us to the put although this method // shouldn't be called concurrently for the same listener as that would violate // notification ordering. In any case loop back up and try again. if( newNotificationTask == null ) { newNotificationTask = new NotificationTask( key, notifications ); } existingTask = listenerCache.putIfAbsent( key, newNotificationTask ); if( existingTask == null ) { // We were able to put our new task - now submit it to the executor and // we're done. If it throws a RejectedxecutionException, let that propagate // to the caller. LOG.debug( "{}: Submitting NotificationTask for listener {}", name, listener.toString() ); executor.execute( newNotificationTask ); break; } } else { // We were able to add our notifications to an existing task so we're done. break; } } } catch( InterruptedException e ) { // We were interrupted trying to offer to the listener's queue. Somebody's probably // telling us to quit. LOG.debug( "{}: Interrupted trying to add to {} listener's queue", name, listener.toString() ); } if( LOG.isTraceEnabled() ) { LOG.trace( "{}: submitNotifications dine for listener {}", name, listener.toString() ); } } /** * Returns {@link ListenerNotificationQueueStats} instances for each current listener * notification task in progress. */ public List getListenerNotificationQueueStats() { List statsList = new ArrayList<>( listenerCache.size() ); for( NotificationTask task: listenerCache.values() ) { statsList.add( new ListenerNotificationQueueStats( task.listenerKey.toString(), task.notificationQueue.size() ) ); } return statsList ; } /** * Returns the maximum listener queue capacity. */ public int getMaxQueueCapacity(){ return maxQueueCapacity; } /** * Returns the {@link Executor} to used for notification tasks. */ public Executor getExecutor(){ return executor; } /** * Used as the listenerCache map key. We key by listener reference identity hashCode/equals. * Since we don't know anything about the listener class implementations and we're mixing * multiple listener class instances in the same map, this avoids any potential issue with an * equals implementation that just blindly casts the other Object to compare instead of checking * for instanceof. */ private static class ListenerKey { private final L listener; public ListenerKey( L listener ) { this.listener = listener; } L getListener() { return listener; } @Override public int hashCode() { return System.identityHashCode( listener ); } @Override public boolean equals( Object obj ) { if (obj == this) { return true; } if (!(obj instanceof ListenerKey)) { return false; } ListenerKey other = (ListenerKey) obj; return listener == other.listener; } @Override public String toString() { return listener.toString(); } } /** * Executor task for a single listener that queues notifications and sends them serially to the * listener. */ private class NotificationTask implements Runnable { private final BlockingQueue notificationQueue; private volatile boolean done = false; @GuardedBy("queuingLock") private boolean queuedNotifications = false; private final Lock queuingLock = new ReentrantLock(); private final ListenerKey listenerKey; NotificationTask( ListenerKey listenerKey, Iterable notifications ) { this.listenerKey = listenerKey; this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity ); for( N notification: notifications ) { this.notificationQueue.add( notification ); } } boolean submitNotifications( Iterable notifications ) throws InterruptedException { queuingLock.lock(); try { // Check the done flag - if true then #run is in the process of exiting so return // false to indicate such. Otherwise, offer the notifications to the queue. if( done ) { return false; } for( N notification: notifications ) { while( true ) { // Try to offer for up to a minute and log a message if it times out. // FIXME: we loop forever to guarantee delivery however this leaves it open // for 1 rogue listener to bring everyone to a halt. Another option is to // limit the tries and give up after a while and drop the notification. // Given a reasonably large queue capacity and long timeout, if we still // can't queue then most likely the listener is an unrecoverable state // (deadlock or endless loop). if( LOG.isDebugEnabled() ) { LOG.debug( "{}: Offering notification to the queue for listener {}: {}", name, listenerKey.toString(), notification ); } if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) { break; } LOG.warn( "{}: Timed out trying to offer a notification to the queue for listener {}." + "The queue has reached its capacity of {}", name, listenerKey.toString(), maxQueueCapacity ); } } // Set the queuedNotifications flag to tell #run that we've just queued // notifications and not to exit yet, even if it thinks the queue is empty at this // point. queuedNotifications = true; } finally { queuingLock.unlock(); } return true; } @Override public void run() { try { // Loop until we've dispatched all the notifications in the queue. while( true ) { // Get the notification at the head of the queue, waiting a little bit for one // to get offered. N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS ); if( notification == null ) { // The queue is empty - try to get the queuingLock. If we can't get the lock // then #submitNotifications is in the process of offering to the queue so // we'll loop back up and poll the queue again. if( queuingLock.tryLock() ) { try { // Check the queuedNotifications flag to see if #submitNotifications // has offered new notification(s) to the queue. If so, loop back up // and poll the queue again. Otherwise set done to true and exit. // Once we set the done flag and unlock, calls to // #submitNotifications will fail and a new task will be created. if( !queuedNotifications ) { done = true; break; } // Clear the queuedNotifications flag so we'll try to exit the next // time through the loop when the queue is empty. queuedNotifications = false; } finally { queuingLock.unlock(); } } } notifyListener( notification ); } } catch( InterruptedException e ) { // The executor is probably shutting down so log as debug. LOG.debug( "{}: Interrupted trying to remove from {} listener's queue", name, listenerKey.toString() ); } finally { // We're exiting, gracefully or not - either way make sure we always remove // ourselves from the cache. listenerCache.remove( listenerKey ); } } private void notifyListener( N notification ) { if( notification == null ) { return; } try { if( LOG.isDebugEnabled() ) { LOG.debug( "{}: Invoking listener {} with notification: {}", name, listenerKey.toString(), notification ); } listenerInvoker.invokeListener( listenerKey.getListener(), notification ); } catch( RuntimeException e ) { // We'll let a RuntimeException from the listener slide and keep sending any // remaining notifications. LOG.error( String.format( "%1$s: Error notifying listener %2$s", name, listenerKey.toString() ), e ); } catch( Error e ) { // A JVM Error is severe - best practice is to throw them up the chain. Set done to // true so no new notifications can be added to this task as we're about to bail. done = true; throw e; } } } }