package org.opendaylight.yangtools.util.concurrent;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
if( LOG.isTraceEnabled() ) {
LOG.trace( "{}: submitNotifications for listener {}: {}",
- name, listener.getClass(), notifications );
+ name, listener.toString(), notifications );
}
ListenerKey<L> key = new ListenerKey<>( listener );
// to the caller.
LOG.debug( "{}: Submitting NotificationTask for listener {}",
- name, listener.getClass() );
+ name, listener.toString() );
executor.execute( newNotificationTask );
break;
// telling us to quit.
LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
- name, listener.getClass() );
+ name, listener.toString() );
}
if( LOG.isTraceEnabled() ) {
LOG.trace( "{}: submitNotifications dine for listener {}",
- name, listener.getClass() );
+ name, listener.toString() );
}
}
+ /**
+ * Returns {@link ListenerNotificationQueueStats} instances for each current listener
+ * notification task in progress.
+ */
+ public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
+ List<ListenerNotificationQueueStats> statsList = new ArrayList<>( listenerCache.size() );
+ for( NotificationTask task: listenerCache.values() ) {
+ statsList.add( new ListenerNotificationQueueStats(
+ task.listenerKey.toString(), task.notificationQueue.size() ) );
+ }
+
+ return statsList ;
+ }
+
+ /**
+ * Returns the maximum listener queue capacity.
+ */
+ public int getMaxQueueCapacity(){
+ return maxQueueCapacity;
+ }
+
+ /**
+ * Returns the {@link Executor} to used for notification tasks.
+ */
+ public Executor getExecutor(){
+ return executor;
+ }
+
/**
* Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
* Since we don't know anything about the listener class implementations and we're mixing
ListenerKey<?> other = (ListenerKey<?>) obj;
return listener == other.listener;
}
+
+ @Override
+ public String toString() {
+ return listener.toString();
+ }
}
/**
if( LOG.isDebugEnabled() ) {
LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
- name, listenerKey.getListener().getClass(), notification );
+ name, listenerKey.toString(), notification );
}
if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) {
LOG.warn(
"{}: Timed out trying to offer a notification to the queue for listener {}." +
"The queue has reached its capacity of {}",
- name, listenerKey.getListener().getClass(), maxQueueCapacity );
+ name, listenerKey.toString(), maxQueueCapacity );
}
}
// The executor is probably shutting down so log as debug.
LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
- name, listenerKey.getListener().getClass() );
+ name, listenerKey.toString() );
} finally {
// We're exiting, gracefully or not - either way make sure we always remove
if( LOG.isDebugEnabled() ) {
LOG.debug( "{}: Invoking listener {} with notification: {}",
- name, listenerKey.getListener().getClass(), notification );
+ name, listenerKey.toString(), notification );
}
listenerInvoker.invokeListener( listenerKey.getListener(), notification );
// remaining notifications.
LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
- listenerKey.getListener().getClass() ), e );
+ listenerKey.toString() ), e );
} catch( Error e ) {