Allow QueuedNotificationManager to batch notifications
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
index 21a63ca2532b6295bdd115194e3a6af2024af55e..5160efe9bf7ccd785a9455335973605906999f61 100644 (file)
@@ -10,16 +10,18 @@ package org.opendaylight.yangtools.util.concurrent;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
@@ -86,7 +88,9 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
      * Caps the maximum number of attempts to offer notification to a particular listener.  Each
      * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
      */
-    private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
+    private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
+    private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
+    private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
 
     private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
     private final BatchedInvoker<L, N> listenerInvoker;
@@ -116,8 +120,14 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
     @Deprecated
     public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
             final int maxQueueCapacity, final String name) {
-        this(executor, (BatchedInvoker<L, N>)(l, c) -> c.forEach(n -> listenerInvoker.invokeListener(l, n)),
-            maxQueueCapacity, name);
+        this(executor, (BatchedInvoker<L, N>)(l, c) -> c.forEach(n -> {
+            try {
+                listenerInvoker.invokeListener(l, n);
+            } catch (Exception e) {
+                LOG.error("{}: Error notifying listener {} with {}", name, l, n, e);
+            }
+
+        }), maxQueueCapacity, name);
         Preconditions.checkNotNull(listenerInvoker);
     }
 
@@ -134,6 +144,20 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
         return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
     }
 
+    /**
+     * Returns the maximum listener queue capacity.
+     */
+    public int getMaxQueueCapacity() {
+        return maxQueueCapacity;
+    }
+
+    /**
+     * Returns the {@link Executor} to used for notification tasks.
+     */
+    public Executor getExecutor() {
+        return executor;
+    }
+
     /* (non-Javadoc)
      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
      */
@@ -163,38 +187,47 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
         // add our notifications to an existing NotificationTask. Eventually one or the other
         // will occur.
         try {
-            NotificationTask newNotificationTask = null;
+            Iterator<N> it = notifications.iterator();
 
             while (true) {
-                final NotificationTask existingTask = listenerCache.get(key);
-                if (existingTask != null && existingTask.submitNotifications(notifications)) {
-                    // We were able to add our notifications to an existing task so we're done.
-                    break;
+                NotificationTask task = listenerCache.get(key);
+                if (task == null) {
+                    // No task found, try to insert a new one
+                    final NotificationTask newTask = new NotificationTask(key, it);
+                    task = listenerCache.putIfAbsent(key, newTask);
+                    if (task == null) {
+                        // We were able to put our new task - now submit it to the executor and
+                        // we're done. If it throws a RejectedExecutionException, let that propagate
+                        // to the caller.
+                        runTask(listener, newTask);
+                        break;
+                    }
+
+                    // We have a racing task, hence we can continue, but we need to refresh our iterator from
+                    // the task.
+                    it = newTask.recoverItems();
                 }
 
-                // Either there's no existing task or we couldn't add our notifications to the
-                // existing one because it's in the process of exiting and removing itself from
-                // the cache. Either way try to put a new task in the cache. If we can't put
-                // then either the existing one is still there and hasn't removed itself quite
-                // yet or some other concurrent thread beat us to the put although this method
-                // shouldn't be called concurrently for the same listener as that would violate
-                // notification ordering. In any case loop back up and try again.
+                final boolean completed = task.submitNotifications(it);
+                if (!completed) {
+                    // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
+                    // than spinning on removal, we try to replace it.
+                    final NotificationTask newTask = new NotificationTask(key, it);
+                    if (listenerCache.replace(key, task, newTask)) {
+                        runTask(listener, newTask);
+                        break;
+                    }
 
-                if (newNotificationTask == null) {
-                    newNotificationTask = new NotificationTask(key, notifications);
-                }
-                final NotificationTask oldTask = listenerCache.putIfAbsent(key, newNotificationTask);
-                if (oldTask == null) {
-                    // We were able to put our new task - now submit it to the executor and
-                    // we're done. If it throws a RejectedxecutionException, let that propagate
-                    // to the caller.
-
-                    LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
-                    executor.execute(newNotificationTask);
-                    break;
+                    // We failed to replace the task, hence we need retry. Note we have to recover the items to be
+                    // published from the new task.
+                    it = newTask.recoverItems();
+                    LOG.debug("{}: retrying task queueing for {}", name, listener);
+                    continue;
                 }
 
-                LOG.debug("{}: retrying task queueing for {}", name, listener);
+                // All notifications have either been delivered or we have timed out and warned about the ones we
+                // have failed to deliver. In any case we are done here.
+                break;
             }
         } catch (InterruptedException e) {
             // We were interrupted trying to offer to the listener's queue. Somebody's probably
@@ -202,7 +235,7 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
             LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
         }
 
-        LOG.trace("{}: submitNotifications dine for listener {}", name, listener);
+        LOG.trace("{}: submitNotifications done for listener {}", name, listener);
     }
 
     /**
@@ -211,21 +244,12 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
      */
     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
         return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
-            t.notificationQueue.size())).collect(Collectors.toList());
+            t.size())).collect(Collectors.toList());
     }
 
-    /**
-     * Returns the maximum listener queue capacity.
-     */
-    public int getMaxQueueCapacity() {
-        return maxQueueCapacity;
-    }
-
-    /**
-     * Returns the {@link Executor} to used for notification tasks.
-     */
-    public Executor getExecutor() {
-        return executor;
+    private void runTask(final L listener, final NotificationTask task) {
+        LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
+        executor.execute(task);
     }
 
     /**
@@ -270,70 +294,101 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
      * listener.
      */
     private class NotificationTask implements Runnable {
-        private final Lock queuingLock = new ReentrantLock();
-        private final BlockingQueue<N> notificationQueue;
+
+        private final Lock lock = new ReentrantLock();
+        private final Condition notEmpty = lock.newCondition();
+        private final Condition notFull = lock.newCondition();
         private final ListenerKey<L> listenerKey;
 
-        @GuardedBy("queuingLock")
-        private boolean queuedNotifications = false;
-        private volatile boolean done = false;
+        @GuardedBy("lock")
+        private final Queue<N> queue = new ArrayDeque<>();
+        @GuardedBy("lock")
+        private boolean exiting;
 
-        NotificationTask(final ListenerKey<L> listenerKey, final Iterable<N> notifications) {
+        NotificationTask(final ListenerKey<L> listenerKey, final Iterator<N> notifications) {
             this.listenerKey = Preconditions.checkNotNull(listenerKey);
-            this.notificationQueue = new LinkedBlockingQueue<>(maxQueueCapacity);
+            while (notifications.hasNext()) {
+                queue.offer(notifications.next());
+            }
+        }
 
-            for (N notification: notifications) {
-                this.notificationQueue.add(notification);
+        Iterator<N> recoverItems() {
+            // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
+            // get started, hence this is safe.
+            return queue.iterator();
+        }
+
+        int size() {
+            lock.lock();
+            try {
+                return queue.size();
+            } finally {
+                lock.unlock();
             }
         }
 
-        @GuardedBy("queuingLock")
-        private void publishNotification(final N notification) throws InterruptedException {
-            // The offer is attempted for up to 10 minutes, with a status message printed each minute
-            for (int notificationOfferAttempts = 0;
-                 notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
+        boolean submitNotifications(final Iterator<N> notifications) throws InterruptedException {
+            final long start = System.nanoTime();
+            final long deadline = start + GIVE_UP_NANOS;
 
-                // Try to offer for up to a minute and log a message if it times out.
-                LOG.debug("{}: Offering notification to the queue for listener {}: {}", name, listenerKey,
-                    notification);
+            lock.lock();
+            try {
+                // Lock may have blocked for some time, we need to take that into account. We may have exceedded
+                // the deadline, but that is unlikely and even in that case we can make some progress without further
+                // blocking.
+                long canWait = deadline - System.nanoTime();
 
-                if (notificationQueue.offer(notification, 1, TimeUnit.MINUTES)) {
-                    return;
-                }
+                while (true) {
+                    // Check the exiting flag - if true then #run is in the process of exiting so return
+                    // false to indicate such. Otherwise, offer the notifications to the queue.
+                    if (exiting) {
+                        return false;
+                    }
 
-                LOG.warn("{}: Timed out trying to offer a notification to the queue for listener {} "
-                        + "on attempt {} of {}. The queue has reached its capacity of {}", name, listenerKey,
-                        notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity);
-            }
+                    final int avail = maxQueueCapacity - queue.size();
+                    if (avail <= 0) {
+                        if (canWait <= 0) {
+                            LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
+                                + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
+                                + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
+                                listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
+                            return true;
+                        }
 
-            LOG.warn("{}: Failed to offer a notification to the queue for listener {}. Exceeded max allowable attempts"
-                    + " of {} in {} minutes; the listener is likely in an unrecoverable state (deadlock or endless"
-                    + " loop).", name, listenerKey, MAX_NOTIFICATION_OFFER_ATTEMPTS, MAX_NOTIFICATION_OFFER_ATTEMPTS);
-        }
+                        canWait = notFull.awaitNanos(canWait);
+                        continue;
+                    }
 
-        boolean submitNotifications(final Iterable<N> notifications) throws InterruptedException {
+                    for (int i = 0; i < avail; ++i) {
+                        if (!notifications.hasNext()) {
+                            notEmpty.signal();
+                            return true;
+                        }
 
-            queuingLock.lock();
-            try {
+                        queue.offer(notifications.next());
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
 
-                // Check the done flag - if true then #run is in the process of exiting so return
-                // false to indicate such. Otherwise, offer the notifications to the queue.
+        @GuardedBy("lock")
+        private boolean waitForQueue() {
+            long timeout = TASK_WAIT_NANOS;
 
-                if (done) {
+            while (queue.isEmpty()) {
+                if (timeout <= 0) {
                     return false;
                 }
 
-                for (N notification : notifications) {
-                    publishNotification(notification);
+                try {
+                    timeout = notEmpty.awaitNanos(timeout);
+                } catch (InterruptedException e) {
+                    // The executor is probably shutting down so log as debug.
+                    LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
+                    return false;
                 }
-
-                // Set the queuedNotifications flag to tell #run that we've just queued
-                // notifications and not to exit yet, even if it thinks the queue is empty at this
-                // point.
-
-                queuedNotifications = true;
-            } finally {
-                queuingLock.unlock();
             }
 
             return true;
@@ -343,48 +398,27 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
         public void run() {
             try {
                 // Loop until we've dispatched all the notifications in the queue.
-
                 while (true) {
-                    // Get the notification at the head of the queue, waiting a little bit for one
-                    // to get offered.
-
-                    final N notification = notificationQueue.poll(10, TimeUnit.MILLISECONDS);
-                    if (notification == null) {
+                    final Collection<N> notifications;
 
-                        // The queue is empty - try to get the queuingLock. If we can't get the lock
-                        // then #submitNotifications is in the process of offering to the queue so
-                        // we'll loop back up and poll the queue again.
-
-                        if (queuingLock.tryLock()) {
-                            try {
-
-                                // Check the queuedNotifications flag to see if #submitNotifications
-                                // has offered new notification(s) to the queue. If so, loop back up
-                                // and poll the queue again. Otherwise set done to true and exit.
-                                // Once we set the done flag and unlock, calls to
-                                // #submitNotifications will fail and a new task will be created.
-
-                                if (!queuedNotifications) {
-                                    done = true;
-                                    break;
-                                }
-
-                                // Clear the queuedNotifications flag so we'll try to exit the next
-                                // time through the loop when the queue is empty.
+                    lock.lock();
+                    try {
+                        if (!waitForQueue()) {
+                            exiting = true;
+                            break;
+                        }
 
-                                queuedNotifications = false;
+                        // Splice the entire queue
+                        notifications = ImmutableList.copyOf(queue);
+                        queue.clear();
 
-                            } finally {
-                                queuingLock.unlock();
-                            }
-                        }
+                        notFull.signalAll();
+                    } finally {
+                        lock.unlock();
                     }
 
-                    notifyListener(notification);
+                    invokeListener(notifications);
                 }
-            } catch (InterruptedException e) {
-                // The executor is probably shutting down so log as debug.
-                LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
             } finally {
                 // We're exiting, gracefully or not - either way make sure we always remove
                 // ourselves from the cache.
@@ -392,17 +426,13 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
             }
         }
 
-        private void notifyListener(final N notification) {
-            if (notification == null) {
-                return;
-            }
-
-            LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notification);
+        private void invokeListener(final Collection<N> notifications) {
+            LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
             try {
-                listenerInvoker.invokeListener(listenerKey.getListener(), ImmutableList.of(notification));
+                listenerInvoker.invokeListener(listenerKey.getListener(), notifications);
             } catch (Exception e) {
                 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
-                LOG.error(String.format("%1$s: Error notifying listener %2$s", name, listenerKey), e);
+                LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
             }
         }
     }