package org.opendaylight.yangtools.util.concurrent;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
import javax.annotation.concurrent.GuardedBy;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
/**
* This class manages queuing and dispatching notifications for multiple listeners concurrently.
* Notifications are queued on a per-listener basis and dispatched serially to each listener via an
private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
+ /**
+ * Caps the maximum number of attempts to offer notification to a particular listener. Each
+ * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
+ */
+ private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
+
private final Executor executor;
private final Invoker<L,N> listenerInvoker;
public void submitNotification( final L listener, final N notification )
throws RejectedExecutionException {
- if( notification == null ) {
+ if (notification == null) {
return;
}
- submitNotifications( listener, Arrays.asList( notification ) );
+ submitNotifications( listener, Collections.singletonList(notification));
}
/* (non-Javadoc)
public void submitNotifications( final L listener, final Iterable<N> notifications )
throws RejectedExecutionException {
- if( notifications == null || listener == null ) {
+ if (notifications == null || listener == null) {
return;
}
- if( LOG.isTraceEnabled() ) {
+ if (LOG.isTraceEnabled()) {
LOG.trace( "{}: submitNotifications for listener {}: {}",
- name, listener.getClass(), notifications );
+ name, listener.toString(), notifications );
}
ListenerKey<L> key = new ListenerKey<>( listener );
// will occur.
try {
- while( true ) {
+ while (true) {
NotificationTask existingTask = listenerCache.get( key );
- if( existingTask == null || !existingTask.submitNotifications( notifications ) ) {
+ if (existingTask == null || !existingTask.submitNotifications( notifications )) {
// Either there's no existing task or we couldn't add our notifications to the
// existing one because it's in the process of exiting and removing itself from
// shouldn't be called concurrently for the same listener as that would violate
// notification ordering. In any case loop back up and try again.
- if( newNotificationTask == null ) {
+ if (newNotificationTask == null) {
newNotificationTask = new NotificationTask( key, notifications );
}
existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
- if( existingTask == null ) {
+ if (existingTask == null) {
// We were able to put our new task - now submit it to the executor and
// we're done. If it throws a RejectedxecutionException, let that propagate
// to the caller.
LOG.debug( "{}: Submitting NotificationTask for listener {}",
- name, listener.getClass() );
+ name, listener.toString() );
executor.execute( newNotificationTask );
break;
// telling us to quit.
LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
- name, listener.getClass() );
+ name, listener.toString() );
}
- if( LOG.isTraceEnabled() ) {
+ if (LOG.isTraceEnabled()) {
LOG.trace( "{}: submitNotifications dine for listener {}",
- name, listener.getClass() );
+ name, listener.toString() );
}
}
*/
public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
List<ListenerNotificationQueueStats> statsList = new ArrayList<>( listenerCache.size() );
- for( NotificationTask task: listenerCache.values() ) {
+ for (NotificationTask task: listenerCache.values()) {
statsList.add( new ListenerNotificationQueueStats(
- task.listenerKey.getListener().getClass().getName(),
- task.notificationQueue.size() ) );
+ task.listenerKey.toString(), task.notificationQueue.size() ) );
}
return statsList ;
ListenerKey<?> other = (ListenerKey<?>) obj;
return listener == other.listener;
}
+
+ @Override
+ public String toString() {
+ return listener.toString();
+ }
}
/**
this.listenerKey = listenerKey;
this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
- for( N notification: notifications ) {
+ for (N notification: notifications) {
this.notificationQueue.add( notification );
}
}
// Check the done flag - if true then #run is in the process of exiting so return
// false to indicate such. Otherwise, offer the notifications to the queue.
- if( done ) {
+ if (done) {
return false;
}
- for( N notification: notifications ) {
-
- while( true ) {
+ for (N notification: notifications) {
+ boolean notificationOfferAttemptSuccess = false;
+ // The offer is attempted for up to 10 minutes, with a status message printed each minute
+ for (int notificationOfferAttempts = 0;
+ notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
// Try to offer for up to a minute and log a message if it times out.
-
- // FIXME: we loop forever to guarantee delivery however this leaves it open
- // for 1 rogue listener to bring everyone to a halt. Another option is to
- // limit the tries and give up after a while and drop the notification.
- // Given a reasonably large queue capacity and long timeout, if we still
- // can't queue then most likely the listener is an unrecoverable state
- // (deadlock or endless loop).
-
- if( LOG.isDebugEnabled() ) {
+ if (LOG.isDebugEnabled()) {
LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
- name, listenerKey.getListener().getClass(), notification );
+ name, listenerKey.toString(), notification );
}
- if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) {
+ if (notificationOfferAttemptSuccess = notificationQueue.offer(
+ notification, 1, TimeUnit.MINUTES)) {
break;
}
LOG.warn(
- "{}: Timed out trying to offer a notification to the queue for listener {}." +
+ "{}: Timed out trying to offer a notification to the queue for listener {} " +
+ "on attempt {} of {}. " +
"The queue has reached its capacity of {}",
- name, listenerKey.getListener().getClass(), maxQueueCapacity );
+ name, listenerKey.toString(), notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS,
+ maxQueueCapacity );
+ }
+ if (!notificationOfferAttemptSuccess) {
+ LOG.warn(
+ "{}: Failed to offer a notification to the queue for listener {}. " +
+ "Exceeded max allowable attempts of {} in {} minutes; the listener " +
+ "is likely in an unrecoverable state (deadlock or endless loop).",
+ name, listenerKey.toString(), MAX_NOTIFICATION_OFFER_ATTEMPTS,
+ MAX_NOTIFICATION_OFFER_ATTEMPTS );
}
}
try {
// Loop until we've dispatched all the notifications in the queue.
- while( true ) {
+ while (true) {
// Get the notification at the head of the queue, waiting a little bit for one
// to get offered.
N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
- if( notification == null ) {
+ if (notification == null) {
// The queue is empty - try to get the queuingLock. If we can't get the lock
// then #submitNotifications is in the process of offering to the queue so
// we'll loop back up and poll the queue again.
- if( queuingLock.tryLock() ) {
+ if (queuingLock.tryLock()) {
try {
// Check the queuedNotifications flag to see if #submitNotifications
// Once we set the done flag and unlock, calls to
// #submitNotifications will fail and a new task will be created.
- if( !queuedNotifications ) {
+ if (!queuedNotifications) {
done = true;
break;
}
// The executor is probably shutting down so log as debug.
LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
- name, listenerKey.getListener().getClass() );
+ name, listenerKey.toString() );
} finally {
// We're exiting, gracefully or not - either way make sure we always remove
private void notifyListener( N notification ) {
- if( notification == null ) {
+ if (notification == null) {
return;
}
try {
- if( LOG.isDebugEnabled() ) {
+ if (LOG.isDebugEnabled()) {
LOG.debug( "{}: Invoking listener {} with notification: {}",
- name, listenerKey.getListener().getClass(), notification );
+ name, listenerKey.toString(), notification );
}
listenerInvoker.invokeListener( listenerKey.getListener(), notification );
// remaining notifications.
LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
- listenerKey.getListener().getClass() ), e );
+ listenerKey.toString() ), e );
} catch( Error e ) {