Cleanup use of Guava library
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
index a939840d6255497b47a9356d5f8b133b52c61290..2942799556ebeecc6babb2829c2c5e5f8a1921ee 100644 (file)
@@ -8,30 +8,36 @@
 
 package org.opendaylight.yangtools.util.concurrent;
 
-import java.util.Arrays;
-import java.util.concurrent.BlockingQueue;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
  * {@link Executor}.
- * <p>
- * This class optimizes its memory footprint by only allocating and maintaining a queue and executor
+ *
+ * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
  * task for a listener when there are pending notifications. On the first notification(s), a queue
  * is created and a task is submitted to the executor to dispatch the queue to the associated
  * listener. Any subsequent notifications that occur before all previous notifications have been
@@ -43,7 +49,7 @@ import com.google.common.base.Preconditions;
  * @param <L> the listener type
  * @param <N> the notification type
  */
-public class QueuedNotificationManager<L,N> implements NotificationManager<L,N> {
+public class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
 
     /**
      * Interface implemented by clients that does the work of invoking listeners with notifications.
@@ -52,29 +58,57 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
      *
      * @param <L> the listener type
      * @param <N> the notification type
+     *
+     * @deprecated Use {@link QueuedNotificationManager.BatchedInvoker} instead.
      */
-    public interface Invoker<L,N> {
-
+    @Deprecated
+    @FunctionalInterface
+    public interface Invoker<L, N> {
         /**
          * Called to invoke a listener with a notification.
          *
          * @param listener the listener to invoke
          * @param notification the notification to send
          */
-        void invokeListener( L listener, N notification );
+        void invokeListener(L listener, N notification);
     }
 
-    private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
+    @FunctionalInterface
+    public interface BatchedInvoker<L, N> {
+        /**
+         * Called to invoke a listener with a notification.
+         *
+         * @param listener the listener to invoke
+         * @param notifications notifications to send
+         */
+        void invokeListener(@Nonnull L listener, @Nonnull Collection<? extends N> notifications);
+    }
 
-    private final Executor executor;
-    private final Invoker<L,N> listenerInvoker;
+    private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
 
-    private final ConcurrentMap<ListenerKey<L>,NotificationTask>
-                                                          listenerCache = new ConcurrentHashMap<>();
+    /**
+     * Caps the maximum number of attempts to offer notification to a particular listener.  Each
+     * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
+     */
+    private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
+    private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
+    private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
 
+    private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
+    private final BatchedInvoker<L, N> listenerInvoker;
+    private final Executor executor;
     private final String name;
     private final int maxQueueCapacity;
 
+    private QueuedNotificationManager(final Executor executor, final BatchedInvoker<L, N> listenerInvoker,
+            final int maxQueueCapacity, final String name) {
+        checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
+        this.executor = requireNonNull(executor);
+        this.listenerInvoker = requireNonNull(listenerInvoker);
+        this.maxQueueCapacity = maxQueueCapacity;
+        this.name = requireNonNull(name);
+    }
+
     /**
      * Constructor.
      *
@@ -82,104 +116,143 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
      * @param listenerInvoker the {@link Invoker} to use for invoking listeners
      * @param maxQueueCapacity the capacity of each listener queue
      * @param name the name of this instance for logging info
+     *
+     * @deprecated Use {@link #create(Executor, BatchedInvoker, int, String)} instead.
      */
-    public QueuedNotificationManager( Executor executor, Invoker<L,N> listenerInvoker,
-            int maxQueueCapacity, String name ) {
-        this.executor = Preconditions.checkNotNull( executor );
-        this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker );
-        Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " );
-        this.maxQueueCapacity = maxQueueCapacity;
-        this.name = Preconditions.checkNotNull( name );
+    @Deprecated
+    @SuppressWarnings("checkstyle:illegalCatch")
+    public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
+            final int maxQueueCapacity, final String name) {
+        this(executor, (BatchedInvoker<L, N>)(listener, notifications) -> notifications.forEach(n -> {
+            try {
+                listenerInvoker.invokeListener(listener, n);
+            } catch (Exception e) {
+                LOG.error("{}: Error notifying listener {} with {}", name, listener, n, e);
+            }
+
+        }), maxQueueCapacity, name);
+        requireNonNull(listenerInvoker);
+    }
+
+    /**
+     * Create a new notification manager.
+     *
+     * @param executor the {@link Executor} to use for notification tasks
+     * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners
+     * @param maxQueueCapacity the capacity of each listener queue
+     * @param name the name of this instance for logging info
+     */
+    public static <L, N> QueuedNotificationManager<L, N> create(final Executor executor,
+            final BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity, final String name) {
+        return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
+    }
+
+    /**
+     * Returns the maximum listener queue capacity.
+     */
+    public int getMaxQueueCapacity() {
+        return maxQueueCapacity;
+    }
+
+    /**
+     * Returns the {@link Executor} to used for notification tasks.
+     */
+    public Executor getExecutor() {
+        return executor;
     }
 
     /* (non-Javadoc)
      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
      */
     @Override
-    public void submitNotification( final L listener, final N notification )
-            throws RejectedExecutionException {
-
-        if( notification == null ) {
-            return;
+    public void submitNotification(final L listener, final N notification) throws RejectedExecutionException {
+        if (notification != null) {
+            submitNotifications(listener, Collections.singletonList(notification));
         }
-
-        submitNotifications( listener, Arrays.asList( notification ) );
     }
 
     /* (non-Javadoc)
      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
      */
     @Override
-    public void submitNotifications( final L listener, final Iterable<N> notifications )
+    public void submitNotifications(final L listener, final Iterable<N> notifications)
             throws RejectedExecutionException {
 
-        if( notifications == null || listener == null ) {
+        if (notifications == null || listener == null) {
             return;
         }
 
-        if( LOG.isTraceEnabled() ) {
-            LOG.trace( "{}: submitNotifications for listener {}: {}",
-                       name, listener.getClass(), notifications );
-        }
+        LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
 
-        ListenerKey<L> key = new ListenerKey<>( listener );
-        NotificationTask newNotificationTask = null;
+        final ListenerKey<L> key = new ListenerKey<>(listener);
 
         // Keep looping until we are either able to add a new NotificationTask or are able to
         // add our notifications to an existing NotificationTask. Eventually one or the other
         // will occur.
-
         try {
-            while( true ) {
-                NotificationTask existingTask = listenerCache.get( key );
-
-                if( existingTask == null || !existingTask.submitNotifications( notifications ) ) {
-
-                    // Either there's no existing task or we couldn't add our notifications to the
-                    // existing one because it's in the process of exiting and removing itself from
-                    // the cache. Either way try to put a new task in the cache. If we can't put
-                    // then either the existing one is still there and hasn't removed itself quite
-                    // yet or some other concurrent thread beat us to the put although this method
-                    // shouldn't be called concurrently for the same listener as that would violate
-                    // notification ordering. In any case loop back up and try again.
-
-                    if( newNotificationTask == null ) {
-                        newNotificationTask = new NotificationTask( key, notifications );
-                    }
-
-                    existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
-                    if( existingTask == null ) {
-
+            Iterator<N> it = notifications.iterator();
+
+            while (true) {
+                NotificationTask task = listenerCache.get(key);
+                if (task == null) {
+                    // No task found, try to insert a new one
+                    final NotificationTask newTask = new NotificationTask(key, it);
+                    task = listenerCache.putIfAbsent(key, newTask);
+                    if (task == null) {
                         // We were able to put our new task - now submit it to the executor and
-                        // we're done. If it throws a RejectedxecutionException, let that propagate
+                        // we're done. If it throws a RejectedExecutionException, let that propagate
                         // to the caller.
+                        runTask(listener, newTask);
+                        break;
+                    }
 
-                        LOG.debug( "{}: Submitting NotificationTask for listener {}",
-                                   name, listener.getClass() );
+                    // We have a racing task, hence we can continue, but we need to refresh our iterator from
+                    // the task.
+                    it = newTask.recoverItems();
+                }
 
-                        executor.execute( newNotificationTask );
+                final boolean completed = task.submitNotifications(it);
+                if (!completed) {
+                    // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
+                    // than spinning on removal, we try to replace it.
+                    final NotificationTask newTask = new NotificationTask(key, it);
+                    if (listenerCache.replace(key, task, newTask)) {
+                        runTask(listener, newTask);
                         break;
                     }
-                } else {
 
-                    // We were able to add our notifications to an existing task so we're done.
-
-                    break;
+                    // We failed to replace the task, hence we need retry. Note we have to recover the items to be
+                    // published from the new task.
+                    it = newTask.recoverItems();
+                    LOG.debug("{}: retrying task queueing for {}", name, listener);
+                    continue;
                 }
-            }
-        } catch( InterruptedException e ) {
 
+                // All notifications have either been delivered or we have timed out and warned about the ones we
+                // have failed to deliver. In any case we are done here.
+                break;
+            }
+        } catch (InterruptedException e) {
             // We were interrupted trying to offer to the listener's queue. Somebody's probably
             // telling us to quit.
-
-            LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
-                       name, listener.getClass() );
+            LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
         }
 
-        if( LOG.isTraceEnabled() ) {
-            LOG.trace( "{}: submitNotifications dine for listener {}",
-                       name, listener.getClass() );
-        }
+        LOG.trace("{}: submitNotifications done for listener {}", name, listener);
+    }
+
+    /**
+     * Returns {@link ListenerNotificationQueueStats} instances for each current listener
+     * notification task in progress.
+     */
+    public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
+        return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
+            t.size())).collect(Collectors.toList());
+    }
+
+    private void runTask(final L listener, final NotificationTask task) {
+        LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
+        executor.execute(task);
     }
 
     /**
@@ -189,12 +262,11 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
      * equals implementation that just blindly casts the other Object to compare instead of checking
      * for instanceof.
      */
-    private static class ListenerKey<L> {
-
+    private static final class ListenerKey<L> {
         private final L listener;
 
-        public ListenerKey( L listener ) {
-            this.listener = listener;
+        ListenerKey(final L listener) {
+            this.listener = requireNonNull(listener);
         }
 
         L getListener() {
@@ -203,20 +275,20 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
 
         @Override
         public int hashCode() {
-            return System.identityHashCode( listener );
+            return System.identityHashCode(listener);
         }
 
         @Override
-        public boolean equals( Object obj ) {
+        public boolean equals(final Object obj) {
             if (obj == this) {
                 return true;
             }
-            if (!(obj instanceof ListenerKey<?>)) {
-                return false;
-            }
+            return obj instanceof ListenerKey<?> && listener == ((ListenerKey<?>) obj).listener;
+        }
 
-            ListenerKey<?> other = (ListenerKey<?>) obj;
-            return listener == other.listener;
+        @Override
+        public String toString() {
+            return listener.toString();
         }
     }
 
@@ -226,76 +298,100 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
      */
     private class NotificationTask implements Runnable {
 
-        private final BlockingQueue<N> notificationQueue;
-
-        private volatile boolean done = false;
-
-        @GuardedBy("queuingLock")
-        private boolean queuedNotifications = false;
-
-        private final Lock queuingLock = new ReentrantLock();
-
+        private final Lock lock = new ReentrantLock();
+        private final Condition notEmpty = lock.newCondition();
+        private final Condition notFull = lock.newCondition();
         private final ListenerKey<L> listenerKey;
 
-        NotificationTask( ListenerKey<L> listenerKey, Iterable<N> notifications ) {
+        @GuardedBy("lock")
+        private final Queue<N> queue = new ArrayDeque<>();
+        @GuardedBy("lock")
+        private boolean exiting;
 
-            this.listenerKey = listenerKey;
-            this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
-
-            for( N notification: notifications ) {
-                this.notificationQueue.add( notification );
+        NotificationTask(final ListenerKey<L> listenerKey, final Iterator<N> notifications) {
+            this.listenerKey = requireNonNull(listenerKey);
+            while (notifications.hasNext()) {
+                queue.offer(notifications.next());
             }
         }
 
-        boolean submitNotifications( Iterable<N> notifications ) throws InterruptedException {
+        Iterator<N> recoverItems() {
+            // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
+            // get started, hence this is safe.
+            return queue.iterator();
+        }
 
-            queuingLock.lock();
+        int size() {
+            lock.lock();
             try {
+                return queue.size();
+            } finally {
+                lock.unlock();
+            }
+        }
 
-                // Check the done flag - if true then #run is in the process of exiting so return
-                // false to indicate such. Otherwise, offer the notifications to the queue.
-
-                if( done ) {
-                    return false;
-                }
-
-                for( N notification: notifications ) {
-
-                    while( true ) {
-
-                        // Try to offer for up to a minute and log a message if it times out.
+        boolean submitNotifications(final Iterator<N> notifications) throws InterruptedException {
+            final long start = System.nanoTime();
+            final long deadline = start + GIVE_UP_NANOS;
 
-                        // FIXME: we loop forever to guarantee delivery however this leaves it open
-                        // for 1 rogue listener to bring everyone to a halt. Another option is to
-                        // limit the tries and give up after a while and drop the notification.
-                        // Given a reasonably large queue capacity and long timeout, if we still
-                        // can't queue then most likely the listener is an unrecoverable state
-                        // (deadlock or endless loop).
+            lock.lock();
+            try {
+                // Lock may have blocked for some time, we need to take that into account. We may have exceedded
+                // the deadline, but that is unlikely and even in that case we can make some progress without further
+                // blocking.
+                long canWait = deadline - System.nanoTime();
+
+                while (true) {
+                    // Check the exiting flag - if true then #run is in the process of exiting so return
+                    // false to indicate such. Otherwise, offer the notifications to the queue.
+                    if (exiting) {
+                        return false;
+                    }
 
-                        if( LOG.isDebugEnabled() ) {
-                            LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
-                                       name, listenerKey.getListener().getClass(), notification );
+                    final int avail = maxQueueCapacity - queue.size();
+                    if (avail <= 0) {
+                        if (canWait <= 0) {
+                            LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
+                                + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
+                                + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
+                                listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
+                            return true;
                         }
 
-                        if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) {
-                            break;
+                        canWait = notFull.awaitNanos(canWait);
+                        continue;
+                    }
+
+                    for (int i = 0; i < avail; ++i) {
+                        if (!notifications.hasNext()) {
+                            notEmpty.signal();
+                            return true;
                         }
 
-                        LOG.warn(
-                            "{}: Timed out trying to offer a notification to the queue for listener {}." +
-                            "The queue has reached its capacity of {}",
-                            name, listenerKey.getListener().getClass(), maxQueueCapacity );
+                        queue.offer(notifications.next());
                     }
                 }
+            } finally {
+                lock.unlock();
+            }
+        }
 
-                // Set the queuedNotifications flag to tell #run that we've just queued
-                // notifications and not to exit yet, even if it thinks the queue is empty at this
-                // point.
+        @GuardedBy("lock")
+        private boolean waitForQueue() {
+            long timeout = TASK_WAIT_NANOS;
 
-                queuedNotifications = true;
+            while (queue.isEmpty()) {
+                if (timeout <= 0) {
+                    return false;
+                }
 
-            } finally {
-                queuingLock.unlock();
+                try {
+                    timeout = notEmpty.awaitNanos(timeout);
+                } catch (InterruptedException e) {
+                    // The executor is probably shutting down so log as debug.
+                    LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
+                    return false;
+                }
             }
 
             return true;
@@ -303,93 +399,44 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
 
         @Override
         public void run() {
-
             try {
                 // Loop until we've dispatched all the notifications in the queue.
+                while (true) {
+                    final Collection<N> notifications;
 
-                while( true ) {
-
-                    // Get the notification at the head of the queue, waiting a little bit for one
-                    // to get offered.
-
-                    N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
-                    if( notification == null ) {
-
-                        // The queue is empty - try to get the queuingLock. If we can't get the lock
-                        // then #submitNotifications is in the process of offering to the queue so
-                        // we'll loop back up and poll the queue again.
-
-                        if( queuingLock.tryLock() ) {
-                            try {
-
-                                // Check the queuedNotifications flag to see if #submitNotifications
-                                // has offered new notification(s) to the queue. If so, loop back up
-                                // and poll the queue again. Otherwise set done to true and exit.
-                                // Once we set the done flag and unlock, calls to
-                                // #submitNotifications will fail and a new task will be created.
-
-                                if( !queuedNotifications ) {
-                                    done = true;
-                                    break;
-                                }
-
-                                // Clear the queuedNotifications flag so we'll try to exit the next
-                                // time through the loop when the queue is empty.
+                    lock.lock();
+                    try {
+                        if (!waitForQueue()) {
+                            exiting = true;
+                            break;
+                        }
 
-                                queuedNotifications = false;
+                        // Splice the entire queue
+                        notifications = ImmutableList.copyOf(queue);
+                        queue.clear();
 
-                            } finally {
-                                queuingLock.unlock();
-                            }
-                        }
+                        notFull.signalAll();
+                    } finally {
+                        lock.unlock();
                     }
 
-                    notifyListener( notification );
+                    invokeListener(notifications);
                 }
-            } catch( InterruptedException e ) {
-
-                // The executor is probably shutting down so log as debug.
-                LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
-                           name, listenerKey.getListener().getClass() );
             } finally {
-
                 // We're exiting, gracefully or not - either way make sure we always remove
                 // ourselves from the cache.
-
-                listenerCache.remove( listenerKey );
+                listenerCache.remove(listenerKey, this);
             }
         }
 
-        private void notifyListener( N notification ) {
-
-            if( notification == null ) {
-                return;
-            }
-
+        @SuppressWarnings("checkstyle:illegalCatch")
+        private void invokeListener(final Collection<N> notifications) {
+            LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
             try {
-
-                if( LOG.isDebugEnabled() ) {
-                    LOG.debug( "{}: Invoking listener {} with notification: {}",
-                               name, listenerKey.getListener().getClass(), notification );
-                }
-
-                listenerInvoker.invokeListener( listenerKey.getListener(), notification );
-
-            } catch( RuntimeException e ) {
-
-                // We'll let a RuntimeException from the listener slide and keep sending any
-                // remaining notifications.
-
-                LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
-                           listenerKey.getListener().getClass() ), e );
-
-            } catch( Error e ) {
-
-                // A JVM Error is severe - best practice is to throw them up the chain. Set done to
-                // true so no new notifications can be added to this task as we're about to bail.
-
-                done = true;
-                throw e;
+                listenerInvoker.invokeListener(listenerKey.getListener(), notifications);
+            } catch (Exception e) {
+                // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
+                LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
             }
         }
     }