package org.opendaylight.yangtools.util.concurrent;
-import java.util.Arrays;
-import java.util.concurrent.BlockingQueue;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
/**
* This class manages queuing and dispatching notifications for multiple listeners concurrently.
* Notifications are queued on a per-listener basis and dispatched serially to each listener via an
* {@link Executor}.
- * <p>
- * This class optimizes its memory footprint by only allocating and maintaining a queue and executor
+ *
+ * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
* task for a listener when there are pending notifications. On the first notification(s), a queue
* is created and a task is submitted to the executor to dispatch the queue to the associated
* listener. Any subsequent notifications that occur before all previous notifications have been
* @param <L> the listener type
* @param <N> the notification type
*/
-public class QueuedNotificationManager<L,N> implements NotificationManager<L,N> {
+public class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
/**
* Interface implemented by clients that does the work of invoking listeners with notifications.
*
* @param <L> the listener type
* @param <N> the notification type
+ *
+ * @deprecated Use {@link QueuedNotificationManager.BatchedInvoker} instead.
*/
- public interface Invoker<L,N> {
-
+ @Deprecated
+ @FunctionalInterface
+ public interface Invoker<L, N> {
/**
* Called to invoke a listener with a notification.
*
* @param listener the listener to invoke
* @param notification the notification to send
*/
- void invokeListener( L listener, N notification );
+ void invokeListener(L listener, N notification);
}
- private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
+ @FunctionalInterface
+ public interface BatchedInvoker<L, N> {
+ /**
+ * Called to invoke a listener with a notification.
+ *
+ * @param listener the listener to invoke
+ * @param notifications notifications to send
+ */
+ void invokeListener(@Nonnull L listener, @Nonnull Collection<? extends N> notifications);
+ }
- private final Executor executor;
- private final Invoker<L,N> listenerInvoker;
+ private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
- private final ConcurrentMap<ListenerKey<L>,NotificationTask>
- listenerCache = new ConcurrentHashMap<>();
+ /**
+ * Caps the maximum number of attempts to offer notification to a particular listener. Each
+ * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
+ */
+ private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
+ private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
+ private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
+ private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
+ private final BatchedInvoker<L, N> listenerInvoker;
+ private final Executor executor;
private final String name;
private final int maxQueueCapacity;
+ private QueuedNotificationManager(final Executor executor, final BatchedInvoker<L, N> listenerInvoker,
+ final int maxQueueCapacity, final String name) {
+ Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker);
+ this.maxQueueCapacity = maxQueueCapacity;
+ this.name = Preconditions.checkNotNull(name);
+ }
+
/**
* Constructor.
*
* @param listenerInvoker the {@link Invoker} to use for invoking listeners
* @param maxQueueCapacity the capacity of each listener queue
* @param name the name of this instance for logging info
+ *
+ * @deprecated Use {@link #create(Executor, BatchedInvoker, int, String)} instead.
*/
- public QueuedNotificationManager( Executor executor, Invoker<L,N> listenerInvoker,
- int maxQueueCapacity, String name ) {
- this.executor = Preconditions.checkNotNull( executor );
- this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker );
- Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " );
- this.maxQueueCapacity = maxQueueCapacity;
- this.name = Preconditions.checkNotNull( name );
+ @Deprecated
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
+ final int maxQueueCapacity, final String name) {
+ this(executor, (BatchedInvoker<L, N>)(listener, notifications) -> notifications.forEach(n -> {
+ try {
+ listenerInvoker.invokeListener(listener, n);
+ } catch (Exception e) {
+ LOG.error("{}: Error notifying listener {} with {}", name, listener, n, e);
+ }
+
+ }), maxQueueCapacity, name);
+ Preconditions.checkNotNull(listenerInvoker);
+ }
+
+ /**
+ * Create a new notification manager.
+ *
+ * @param executor the {@link Executor} to use for notification tasks
+ * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners
+ * @param maxQueueCapacity the capacity of each listener queue
+ * @param name the name of this instance for logging info
+ */
+ public static <L, N> QueuedNotificationManager<L, N> create(final Executor executor,
+ final BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity, final String name) {
+ return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
+ }
+
+ /**
+ * Returns the maximum listener queue capacity.
+ */
+ public int getMaxQueueCapacity() {
+ return maxQueueCapacity;
+ }
+
+ /**
+ * Returns the {@link Executor} to used for notification tasks.
+ */
+ public Executor getExecutor() {
+ return executor;
}
/* (non-Javadoc)
* @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
*/
@Override
- public void submitNotification( final L listener, final N notification )
- throws RejectedExecutionException {
-
- if( notification == null ) {
- return;
+ public void submitNotification(final L listener, final N notification) throws RejectedExecutionException {
+ if (notification != null) {
+ submitNotifications(listener, Collections.singletonList(notification));
}
-
- submitNotifications( listener, Arrays.asList( notification ) );
}
/* (non-Javadoc)
* @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
*/
@Override
- public void submitNotifications( final L listener, final Iterable<N> notifications )
+ public void submitNotifications(final L listener, final Iterable<N> notifications)
throws RejectedExecutionException {
- if( notifications == null || listener == null ) {
+ if (notifications == null || listener == null) {
return;
}
- if( LOG.isTraceEnabled() ) {
- LOG.trace( "{}: submitNotifications for listener {}: {}",
- name, listener.getClass(), notifications );
- }
+ LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
- ListenerKey<L> key = new ListenerKey<>( listener );
- NotificationTask newNotificationTask = null;
+ final ListenerKey<L> key = new ListenerKey<>(listener);
// Keep looping until we are either able to add a new NotificationTask or are able to
// add our notifications to an existing NotificationTask. Eventually one or the other
// will occur.
-
try {
- while( true ) {
- NotificationTask existingTask = listenerCache.get( key );
-
- if( existingTask == null || !existingTask.submitNotifications( notifications ) ) {
-
- // Either there's no existing task or we couldn't add our notifications to the
- // existing one because it's in the process of exiting and removing itself from
- // the cache. Either way try to put a new task in the cache. If we can't put
- // then either the existing one is still there and hasn't removed itself quite
- // yet or some other concurrent thread beat us to the put although this method
- // shouldn't be called concurrently for the same listener as that would violate
- // notification ordering. In any case loop back up and try again.
-
- if( newNotificationTask == null ) {
- newNotificationTask = new NotificationTask( key, notifications );
- }
-
- existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
- if( existingTask == null ) {
-
+ Iterator<N> it = notifications.iterator();
+
+ while (true) {
+ NotificationTask task = listenerCache.get(key);
+ if (task == null) {
+ // No task found, try to insert a new one
+ final NotificationTask newTask = new NotificationTask(key, it);
+ task = listenerCache.putIfAbsent(key, newTask);
+ if (task == null) {
// We were able to put our new task - now submit it to the executor and
- // we're done. If it throws a RejectedxecutionException, let that propagate
+ // we're done. If it throws a RejectedExecutionException, let that propagate
// to the caller.
+ runTask(listener, newTask);
+ break;
+ }
- LOG.debug( "{}: Submitting NotificationTask for listener {}",
- name, listener.getClass() );
+ // We have a racing task, hence we can continue, but we need to refresh our iterator from
+ // the task.
+ it = newTask.recoverItems();
+ }
- executor.execute( newNotificationTask );
+ final boolean completed = task.submitNotifications(it);
+ if (!completed) {
+ // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
+ // than spinning on removal, we try to replace it.
+ final NotificationTask newTask = new NotificationTask(key, it);
+ if (listenerCache.replace(key, task, newTask)) {
+ runTask(listener, newTask);
break;
}
- } else {
-
- // We were able to add our notifications to an existing task so we're done.
- break;
+ // We failed to replace the task, hence we need retry. Note we have to recover the items to be
+ // published from the new task.
+ it = newTask.recoverItems();
+ LOG.debug("{}: retrying task queueing for {}", name, listener);
+ continue;
}
- }
- } catch( InterruptedException e ) {
+ // All notifications have either been delivered or we have timed out and warned about the ones we
+ // have failed to deliver. In any case we are done here.
+ break;
+ }
+ } catch (InterruptedException e) {
// We were interrupted trying to offer to the listener's queue. Somebody's probably
// telling us to quit.
-
- LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
- name, listener.getClass() );
+ LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
}
- if( LOG.isTraceEnabled() ) {
- LOG.trace( "{}: submitNotifications dine for listener {}",
- name, listener.getClass() );
- }
+ LOG.trace("{}: submitNotifications done for listener {}", name, listener);
+ }
+
+ /**
+ * Returns {@link ListenerNotificationQueueStats} instances for each current listener
+ * notification task in progress.
+ */
+ public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
+ return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
+ t.size())).collect(Collectors.toList());
+ }
+
+ private void runTask(final L listener, final NotificationTask task) {
+ LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
+ executor.execute(task);
}
/**
* equals implementation that just blindly casts the other Object to compare instead of checking
* for instanceof.
*/
- private static class ListenerKey<L> {
-
+ private static final class ListenerKey<L> {
private final L listener;
- public ListenerKey( L listener ) {
- this.listener = listener;
+ ListenerKey(final L listener) {
+ this.listener = Preconditions.checkNotNull(listener);
}
L getListener() {
@Override
public int hashCode() {
- return System.identityHashCode( listener );
+ return System.identityHashCode(listener);
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ return obj instanceof ListenerKey<?> && listener == ((ListenerKey<?>) obj).listener;
}
@Override
- public boolean equals( Object obj ) {
- ListenerKey<?> other = (ListenerKey<?>) obj;
- return listener == other.listener;
+ public String toString() {
+ return listener.toString();
}
}
*/
private class NotificationTask implements Runnable {
- private final BlockingQueue<N> notificationQueue;
-
- private volatile boolean done = false;
-
- @GuardedBy("queuingLock")
- private boolean queuedNotifications = false;
-
- private final Lock queuingLock = new ReentrantLock();
-
+ private final Lock lock = new ReentrantLock();
+ private final Condition notEmpty = lock.newCondition();
+ private final Condition notFull = lock.newCondition();
private final ListenerKey<L> listenerKey;
- NotificationTask( ListenerKey<L> listenerKey, Iterable<N> notifications ) {
-
- this.listenerKey = listenerKey;
- this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
+ @GuardedBy("lock")
+ private final Queue<N> queue = new ArrayDeque<>();
+ @GuardedBy("lock")
+ private boolean exiting;
- for( N notification: notifications ) {
- this.notificationQueue.add( notification );
+ NotificationTask(final ListenerKey<L> listenerKey, final Iterator<N> notifications) {
+ this.listenerKey = Preconditions.checkNotNull(listenerKey);
+ while (notifications.hasNext()) {
+ queue.offer(notifications.next());
}
}
- boolean submitNotifications( Iterable<N> notifications ) throws InterruptedException {
+ Iterator<N> recoverItems() {
+ // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
+ // get started, hence this is safe.
+ return queue.iterator();
+ }
- queuingLock.lock();
+ int size() {
+ lock.lock();
try {
+ return queue.size();
+ } finally {
+ lock.unlock();
+ }
+ }
- // Check the done flag - if true then #run is in the process of exiting so return
- // false to indicate such. Otherwise, offer the notifications to the queue.
-
- if( done ) {
- return false;
- }
-
- for( N notification: notifications ) {
-
- while( true ) {
-
- // Try to offer for up to a minute and log a message if it times out.
+ boolean submitNotifications(final Iterator<N> notifications) throws InterruptedException {
+ final long start = System.nanoTime();
+ final long deadline = start + GIVE_UP_NANOS;
- // FIXME: we loop forever to guarantee delivery however this leaves it open
- // for 1 rogue listener to bring everyone to a halt. Another option is to
- // limit the tries and give up after a while and drop the notification.
- // Given a reasonably large queue capacity and long timeout, if we still
- // can't queue then most likely the listener is an unrecoverable state
- // (deadlock or endless loop).
+ lock.lock();
+ try {
+ // Lock may have blocked for some time, we need to take that into account. We may have exceedded
+ // the deadline, but that is unlikely and even in that case we can make some progress without further
+ // blocking.
+ long canWait = deadline - System.nanoTime();
+
+ while (true) {
+ // Check the exiting flag - if true then #run is in the process of exiting so return
+ // false to indicate such. Otherwise, offer the notifications to the queue.
+ if (exiting) {
+ return false;
+ }
- if( LOG.isDebugEnabled() ) {
- LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
- name, listenerKey.getListener().getClass(), notification );
+ final int avail = maxQueueCapacity - queue.size();
+ if (avail <= 0) {
+ if (canWait <= 0) {
+ LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
+ + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
+ + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
+ listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
+ return true;
}
- if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) {
- break;
+ canWait = notFull.awaitNanos(canWait);
+ continue;
+ }
+
+ for (int i = 0; i < avail; ++i) {
+ if (!notifications.hasNext()) {
+ notEmpty.signal();
+ return true;
}
- LOG.warn(
- "{}: Timed out trying to offer a notification to the queue for listener {}." +
- "The queue has reached its capacity of {}",
- name, listenerKey.getListener().getClass(), maxQueueCapacity );
+ queue.offer(notifications.next());
}
}
+ } finally {
+ lock.unlock();
+ }
+ }
- // Set the queuedNotifications flag to tell #run that we've just queued
- // notifications and not to exit yet, even if it thinks the queue is empty at this
- // point.
+ @GuardedBy("lock")
+ private boolean waitForQueue() {
+ long timeout = TASK_WAIT_NANOS;
- queuedNotifications = true;
+ while (queue.isEmpty()) {
+ if (timeout <= 0) {
+ return false;
+ }
- } finally {
- queuingLock.unlock();
+ try {
+ timeout = notEmpty.awaitNanos(timeout);
+ } catch (InterruptedException e) {
+ // The executor is probably shutting down so log as debug.
+ LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
+ return false;
+ }
}
return true;
@Override
public void run() {
-
try {
// Loop until we've dispatched all the notifications in the queue.
+ while (true) {
+ final Collection<N> notifications;
- while( true ) {
-
- // Get the notification at the head of the queue, waiting a little bit for one
- // to get offered.
-
- N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
- if( notification == null ) {
-
- // The queue is empty - try to get the queuingLock. If we can't get the lock
- // then #submitNotifications is in the process of offering to the queue so
- // we'll loop back up and poll the queue again.
-
- if( queuingLock.tryLock() ) {
- try {
-
- // Check the queuedNotifications flag to see if #submitNotifications
- // has offered new notification(s) to the queue. If so, loop back up
- // and poll the queue again. Otherwise set done to true and exit.
- // Once we set the done flag and unlock, calls to
- // #submitNotifications will fail and a new task will be created.
-
- if( !queuedNotifications ) {
- done = true;
- break;
- }
-
- // Clear the queuedNotifications flag so we'll try to exit the next
- // time through the loop when the queue is empty.
+ lock.lock();
+ try {
+ if (!waitForQueue()) {
+ exiting = true;
+ break;
+ }
- queuedNotifications = false;
+ // Splice the entire queue
+ notifications = ImmutableList.copyOf(queue);
+ queue.clear();
- } finally {
- queuingLock.unlock();
- }
- }
+ notFull.signalAll();
+ } finally {
+ lock.unlock();
}
- notifyListener( notification );
+ invokeListener(notifications);
}
- } catch( InterruptedException e ) {
-
- // The executor is probably shutting down so log as debug.
- LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
- name, listenerKey.getListener().getClass() );
} finally {
-
// We're exiting, gracefully or not - either way make sure we always remove
// ourselves from the cache.
-
- listenerCache.remove( listenerKey );
+ listenerCache.remove(listenerKey, this);
}
}
- private void notifyListener( N notification ) {
-
- if( notification == null ) {
- return;
- }
-
+ @SuppressWarnings("checkstyle:illegalCatch")
+ private void invokeListener(final Collection<N> notifications) {
+ LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
try {
-
- if( LOG.isDebugEnabled() ) {
- LOG.debug( "{}: Invoking listener {} with notification: {}",
- name, listenerKey.getListener().getClass(), notification );
- }
-
- listenerInvoker.invokeListener( listenerKey.getListener(), notification );
-
- } catch( RuntimeException e ) {
-
- // We'll let a RuntimeException from the listener slide and keep sending any
- // remaining notifications.
-
- LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
- listenerKey.getListener().getClass() ), e );
-
- } catch( Error e ) {
-
- // A JVM Error is severe - best practice is to throw them up the chain. Set done to
- // true so no new notifications can be added to this task as we're about to bail.
-
- done = true;
- throw e;
+ listenerInvoker.invokeListener(listenerKey.getListener(), notifications);
+ } catch (Exception e) {
+ // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
+ LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
}
}
}