X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=common%2Futil%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fyangtools%2Futil%2Fconcurrent%2FQueuedNotificationManager.java;h=2942799556ebeecc6babb2829c2c5e5f8a1921ee;hb=8f2876d895936b36aba1fc3ec65b18900e559184;hp=09d867b15c58551672cf0f7ff2534aa0215624e8;hpb=41c159ad750e969516f9b5fb4b7edeb5633eff7e;p=yangtools.git diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java index 09d867b15c..2942799556 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java @@ -8,19 +8,26 @@ package org.opendaylight.yangtools.util.concurrent; -import com.google.common.base.Preconditions; -import java.util.ArrayList; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayDeque; +import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; -import java.util.concurrent.BlockingQueue; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,8 +36,8 @@ import org.slf4j.LoggerFactory; * This class manages queuing and dispatching notifications for multiple listeners concurrently. * Notifications are queued on a per-listener basis and dispatched serially to each listener via an * {@link Executor}. - *

- * This class optimizes its memory footprint by only allocating and maintaining a queue and executor + * + *

This class optimizes its memory footprint by only allocating and maintaining a queue and executor * task for a listener when there are pending notifications. On the first notification(s), a queue * is created and a task is submitted to the executor to dispatch the queue to the associated * listener. Any subsequent notifications that occur before all previous notifications have been @@ -42,7 +49,7 @@ import org.slf4j.LoggerFactory; * @param the listener type * @param the notification type */ -public class QueuedNotificationManager implements NotificationManager { +public class QueuedNotificationManager implements NotificationManager { /** * Interface implemented by clients that does the work of invoking listeners with notifications. @@ -51,35 +58,57 @@ public class QueuedNotificationManager implements NotificationManager * * @param the listener type * @param the notification type + * + * @deprecated Use {@link QueuedNotificationManager.BatchedInvoker} instead. */ - public interface Invoker { - + @Deprecated + @FunctionalInterface + public interface Invoker { /** * Called to invoke a listener with a notification. * * @param listener the listener to invoke * @param notification the notification to send */ - void invokeListener( L listener, N notification ); + void invokeListener(L listener, N notification); } - private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class ); + @FunctionalInterface + public interface BatchedInvoker { + /** + * Called to invoke a listener with a notification. + * + * @param listener the listener to invoke + * @param notifications notifications to send + */ + void invokeListener(@Nonnull L listener, @Nonnull Collection notifications); + } + + private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class); /** * Caps the maximum number of attempts to offer notification to a particular listener. Each * attempt window is 1 minute, so an offer times out after roughly 10 minutes. */ - private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10; + private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10; + private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES); + private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10); + private final ConcurrentMap, NotificationTask> listenerCache = new ConcurrentHashMap<>(); + private final BatchedInvoker listenerInvoker; private final Executor executor; - private final Invoker listenerInvoker; - - private final ConcurrentMap,NotificationTask> - listenerCache = new ConcurrentHashMap<>(); - private final String name; private final int maxQueueCapacity; + private QueuedNotificationManager(final Executor executor, final BatchedInvoker listenerInvoker, + final int maxQueueCapacity, final String name) { + checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity); + this.executor = requireNonNull(executor); + this.listenerInvoker = requireNonNull(listenerInvoker); + this.maxQueueCapacity = maxQueueCapacity; + this.name = requireNonNull(name); + } + /** * Constructor. * @@ -87,104 +116,129 @@ public class QueuedNotificationManager implements NotificationManager * @param listenerInvoker the {@link Invoker} to use for invoking listeners * @param maxQueueCapacity the capacity of each listener queue * @param name the name of this instance for logging info + * + * @deprecated Use {@link #create(Executor, BatchedInvoker, int, String)} instead. */ - public QueuedNotificationManager( Executor executor, Invoker listenerInvoker, - int maxQueueCapacity, String name ) { - this.executor = Preconditions.checkNotNull( executor ); - this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker ); - Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " ); - this.maxQueueCapacity = maxQueueCapacity; - this.name = Preconditions.checkNotNull( name ); + @Deprecated + @SuppressWarnings("checkstyle:illegalCatch") + public QueuedNotificationManager(final Executor executor, final Invoker listenerInvoker, + final int maxQueueCapacity, final String name) { + this(executor, (BatchedInvoker)(listener, notifications) -> notifications.forEach(n -> { + try { + listenerInvoker.invokeListener(listener, n); + } catch (Exception e) { + LOG.error("{}: Error notifying listener {} with {}", name, listener, n, e); + } + + }), maxQueueCapacity, name); + requireNonNull(listenerInvoker); + } + + /** + * Create a new notification manager. + * + * @param executor the {@link Executor} to use for notification tasks + * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners + * @param maxQueueCapacity the capacity of each listener queue + * @param name the name of this instance for logging info + */ + public static QueuedNotificationManager create(final Executor executor, + final BatchedInvoker listenerInvoker, final int maxQueueCapacity, final String name) { + return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name); + } + + /** + * Returns the maximum listener queue capacity. + */ + public int getMaxQueueCapacity() { + return maxQueueCapacity; + } + + /** + * Returns the {@link Executor} to used for notification tasks. + */ + public Executor getExecutor() { + return executor; } /* (non-Javadoc) * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N) */ @Override - public void submitNotification( final L listener, final N notification ) - throws RejectedExecutionException { - - if (notification == null) { - return; + public void submitNotification(final L listener, final N notification) throws RejectedExecutionException { + if (notification != null) { + submitNotifications(listener, Collections.singletonList(notification)); } - - submitNotifications( listener, Collections.singletonList(notification)); } /* (non-Javadoc) * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection) */ @Override - public void submitNotifications( final L listener, final Iterable notifications ) + public void submitNotifications(final L listener, final Iterable notifications) throws RejectedExecutionException { if (notifications == null || listener == null) { return; } - if (LOG.isTraceEnabled()) { - LOG.trace( "{}: submitNotifications for listener {}: {}", - name, listener.toString(), notifications ); - } + LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications); - ListenerKey key = new ListenerKey<>( listener ); - NotificationTask newNotificationTask = null; + final ListenerKey key = new ListenerKey<>(listener); // Keep looping until we are either able to add a new NotificationTask or are able to // add our notifications to an existing NotificationTask. Eventually one or the other // will occur. - try { - while (true) { - NotificationTask existingTask = listenerCache.get( key ); - - if (existingTask == null || !existingTask.submitNotifications( notifications )) { - - // Either there's no existing task or we couldn't add our notifications to the - // existing one because it's in the process of exiting and removing itself from - // the cache. Either way try to put a new task in the cache. If we can't put - // then either the existing one is still there and hasn't removed itself quite - // yet or some other concurrent thread beat us to the put although this method - // shouldn't be called concurrently for the same listener as that would violate - // notification ordering. In any case loop back up and try again. - - if (newNotificationTask == null) { - newNotificationTask = new NotificationTask( key, notifications ); - } - - existingTask = listenerCache.putIfAbsent( key, newNotificationTask ); - if (existingTask == null) { + Iterator it = notifications.iterator(); + while (true) { + NotificationTask task = listenerCache.get(key); + if (task == null) { + // No task found, try to insert a new one + final NotificationTask newTask = new NotificationTask(key, it); + task = listenerCache.putIfAbsent(key, newTask); + if (task == null) { // We were able to put our new task - now submit it to the executor and - // we're done. If it throws a RejectedxecutionException, let that propagate + // we're done. If it throws a RejectedExecutionException, let that propagate // to the caller. + runTask(listener, newTask); + break; + } - LOG.debug( "{}: Submitting NotificationTask for listener {}", - name, listener.toString() ); + // We have a racing task, hence we can continue, but we need to refresh our iterator from + // the task. + it = newTask.recoverItems(); + } - executor.execute( newNotificationTask ); + final boolean completed = task.submitNotifications(it); + if (!completed) { + // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather + // than spinning on removal, we try to replace it. + final NotificationTask newTask = new NotificationTask(key, it); + if (listenerCache.replace(key, task, newTask)) { + runTask(listener, newTask); break; } - } else { - // We were able to add our notifications to an existing task so we're done. - - break; + // We failed to replace the task, hence we need retry. Note we have to recover the items to be + // published from the new task. + it = newTask.recoverItems(); + LOG.debug("{}: retrying task queueing for {}", name, listener); + continue; } - } - } catch( InterruptedException e ) { + // All notifications have either been delivered or we have timed out and warned about the ones we + // have failed to deliver. In any case we are done here. + break; + } + } catch (InterruptedException e) { // We were interrupted trying to offer to the listener's queue. Somebody's probably // telling us to quit. - - LOG.debug( "{}: Interrupted trying to add to {} listener's queue", - name, listener.toString() ); + LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener); } - if (LOG.isTraceEnabled()) { - LOG.trace( "{}: submitNotifications dine for listener {}", - name, listener.toString() ); - } + LOG.trace("{}: submitNotifications done for listener {}", name, listener); } /** @@ -192,27 +246,13 @@ public class QueuedNotificationManager implements NotificationManager * notification task in progress. */ public List getListenerNotificationQueueStats() { - List statsList = new ArrayList<>( listenerCache.size() ); - for (NotificationTask task: listenerCache.values()) { - statsList.add( new ListenerNotificationQueueStats( - task.listenerKey.toString(), task.notificationQueue.size() ) ); - } - - return statsList ; + return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(), + t.size())).collect(Collectors.toList()); } - /** - * Returns the maximum listener queue capacity. - */ - public int getMaxQueueCapacity(){ - return maxQueueCapacity; - } - - /** - * Returns the {@link Executor} to used for notification tasks. - */ - public Executor getExecutor(){ - return executor; + private void runTask(final L listener, final NotificationTask task) { + LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener); + executor.execute(task); } /** @@ -222,12 +262,11 @@ public class QueuedNotificationManager implements NotificationManager * equals implementation that just blindly casts the other Object to compare instead of checking * for instanceof. */ - private static class ListenerKey { - + private static final class ListenerKey { private final L listener; - public ListenerKey( L listener ) { - this.listener = listener; + ListenerKey(final L listener) { + this.listener = requireNonNull(listener); } L getListener() { @@ -236,20 +275,15 @@ public class QueuedNotificationManager implements NotificationManager @Override public int hashCode() { - return System.identityHashCode( listener ); + return System.identityHashCode(listener); } @Override - public boolean equals( Object obj ) { + public boolean equals(final Object obj) { if (obj == this) { return true; } - if (!(obj instanceof ListenerKey)) { - return false; - } - - ListenerKey other = (ListenerKey) obj; - return listener == other.listener; + return obj instanceof ListenerKey && listener == ((ListenerKey) obj).listener; } @Override @@ -264,81 +298,100 @@ public class QueuedNotificationManager implements NotificationManager */ private class NotificationTask implements Runnable { - private final BlockingQueue notificationQueue; - - private volatile boolean done = false; - - @GuardedBy("queuingLock") - private boolean queuedNotifications = false; - - private final Lock queuingLock = new ReentrantLock(); - + private final Lock lock = new ReentrantLock(); + private final Condition notEmpty = lock.newCondition(); + private final Condition notFull = lock.newCondition(); private final ListenerKey listenerKey; - NotificationTask( ListenerKey listenerKey, Iterable notifications ) { + @GuardedBy("lock") + private final Queue queue = new ArrayDeque<>(); + @GuardedBy("lock") + private boolean exiting; - this.listenerKey = listenerKey; - this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity ); - - for (N notification: notifications) { - this.notificationQueue.add( notification ); + NotificationTask(final ListenerKey listenerKey, final Iterator notifications) { + this.listenerKey = requireNonNull(listenerKey); + while (notifications.hasNext()) { + queue.offer(notifications.next()); } } - boolean submitNotifications( Iterable notifications ) throws InterruptedException { + Iterator recoverItems() { + // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never + // get started, hence this is safe. + return queue.iterator(); + } - queuingLock.lock(); + int size() { + lock.lock(); try { + return queue.size(); + } finally { + lock.unlock(); + } + } - // Check the done flag - if true then #run is in the process of exiting so return - // false to indicate such. Otherwise, offer the notifications to the queue. + boolean submitNotifications(final Iterator notifications) throws InterruptedException { + final long start = System.nanoTime(); + final long deadline = start + GIVE_UP_NANOS; - if (done) { - return false; - } + lock.lock(); + try { + // Lock may have blocked for some time, we need to take that into account. We may have exceedded + // the deadline, but that is unlikely and even in that case we can make some progress without further + // blocking. + long canWait = deadline - System.nanoTime(); - for (N notification: notifications) { - boolean notificationOfferAttemptSuccess = false; - // The offer is attempted for up to 10 minutes, with a status message printed each minute - for (int notificationOfferAttempts = 0; - notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) { + while (true) { + // Check the exiting flag - if true then #run is in the process of exiting so return + // false to indicate such. Otherwise, offer the notifications to the queue. + if (exiting) { + return false; + } - // Try to offer for up to a minute and log a message if it times out. - if (LOG.isDebugEnabled()) { - LOG.debug( "{}: Offering notification to the queue for listener {}: {}", - name, listenerKey.toString(), notification ); + final int avail = maxQueueCapacity - queue.size(); + if (avail <= 0) { + if (canWait <= 0) { + LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded" + + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable" + + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications), + listenerKey, MAX_NOTIFICATION_OFFER_MINUTES); + return true; } - if (notificationOfferAttemptSuccess = notificationQueue.offer( - notification, 1, TimeUnit.MINUTES)) { - break; + canWait = notFull.awaitNanos(canWait); + continue; + } + + for (int i = 0; i < avail; ++i) { + if (!notifications.hasNext()) { + notEmpty.signal(); + return true; } - LOG.warn( - "{}: Timed out trying to offer a notification to the queue for listener {} " + - "on attempt {} of {}. " + - "The queue has reached its capacity of {}", - name, listenerKey.toString(), notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS, - maxQueueCapacity ); - } - if (!notificationOfferAttemptSuccess) { - LOG.warn( - "{}: Failed to offer a notification to the queue for listener {}. " + - "Exceeded max allowable attempts of {} in {} minutes; the listener " + - "is likely in an unrecoverable state (deadlock or endless loop).", - name, listenerKey.toString(), MAX_NOTIFICATION_OFFER_ATTEMPTS, - MAX_NOTIFICATION_OFFER_ATTEMPTS ); + queue.offer(notifications.next()); } } + } finally { + lock.unlock(); + } + } - // Set the queuedNotifications flag to tell #run that we've just queued - // notifications and not to exit yet, even if it thinks the queue is empty at this - // point. + @GuardedBy("lock") + private boolean waitForQueue() { + long timeout = TASK_WAIT_NANOS; - queuedNotifications = true; + while (queue.isEmpty()) { + if (timeout <= 0) { + return false; + } - } finally { - queuingLock.unlock(); + try { + timeout = notEmpty.awaitNanos(timeout); + } catch (InterruptedException e) { + // The executor is probably shutting down so log as debug. + LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey); + return false; + } } return true; @@ -346,93 +399,44 @@ public class QueuedNotificationManager implements NotificationManager @Override public void run() { - try { // Loop until we've dispatched all the notifications in the queue. - while (true) { + final Collection notifications; - // Get the notification at the head of the queue, waiting a little bit for one - // to get offered. - - N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS ); - if (notification == null) { - - // The queue is empty - try to get the queuingLock. If we can't get the lock - // then #submitNotifications is in the process of offering to the queue so - // we'll loop back up and poll the queue again. - - if (queuingLock.tryLock()) { - try { - - // Check the queuedNotifications flag to see if #submitNotifications - // has offered new notification(s) to the queue. If so, loop back up - // and poll the queue again. Otherwise set done to true and exit. - // Once we set the done flag and unlock, calls to - // #submitNotifications will fail and a new task will be created. - - if (!queuedNotifications) { - done = true; - break; - } - - // Clear the queuedNotifications flag so we'll try to exit the next - // time through the loop when the queue is empty. + lock.lock(); + try { + if (!waitForQueue()) { + exiting = true; + break; + } - queuedNotifications = false; + // Splice the entire queue + notifications = ImmutableList.copyOf(queue); + queue.clear(); - } finally { - queuingLock.unlock(); - } - } + notFull.signalAll(); + } finally { + lock.unlock(); } - notifyListener( notification ); + invokeListener(notifications); } - } catch( InterruptedException e ) { - - // The executor is probably shutting down so log as debug. - LOG.debug( "{}: Interrupted trying to remove from {} listener's queue", - name, listenerKey.toString() ); } finally { - // We're exiting, gracefully or not - either way make sure we always remove // ourselves from the cache. - - listenerCache.remove( listenerKey ); + listenerCache.remove(listenerKey, this); } } - private void notifyListener( N notification ) { - - if (notification == null) { - return; - } - + @SuppressWarnings("checkstyle:illegalCatch") + private void invokeListener(final Collection notifications) { + LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications); try { - - if (LOG.isDebugEnabled()) { - LOG.debug( "{}: Invoking listener {} with notification: {}", - name, listenerKey.toString(), notification ); - } - - listenerInvoker.invokeListener( listenerKey.getListener(), notification ); - - } catch( RuntimeException e ) { - - // We'll let a RuntimeException from the listener slide and keep sending any - // remaining notifications. - - LOG.error( String.format( "%1$s: Error notifying listener %2$s", name, - listenerKey.toString() ), e ); - - } catch( Error e ) { - - // A JVM Error is severe - best practice is to throw them up the chain. Set done to - // true so no new notifications can be added to this task as we're about to bail. - - done = true; - throw e; + listenerInvoker.invokeListener(listenerKey.getListener(), notifications); + } catch (Exception e) { + // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications. + LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e); } } }