Add batching to QueuedNotificationManager 93/46093/2
authorRobert Varga <rovarga@cisco.com>
Tue, 20 Sep 2016 22:57:00 +0000 (00:57 +0200)
committerRobert Varga <rovarga@cisco.com>
Thu, 22 Sep 2016 18:05:47 +0000 (20:05 +0200)
Introduce BatchedInvoker, which allows multiple events to be
delivered in one go.

Change-Id: I3d4d054543fa29842d67f6f21f28ff0a47ed135d
Signed-off-by: Robert Varga <rovarga@cisco.com>
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java

index c0fb5602fa89416534a641f71173d88ee2887261..21a63ca2532b6295bdd115194e3a6af2024af55e 100644 (file)
@@ -9,6 +9,8 @@
 package org.opendaylight.yangtools.util.concurrent;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -21,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,9 +54,12 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
      *
      * @param <L> the listener type
      * @param <N> the notification type
+     *
+     * @deprecated Use {@link QueuedNotificationManager.BatchedInvoker} instead.
      */
+    @Deprecated
+    @FunctionalInterface
     public interface Invoker<L, N> {
-
         /**
          * Called to invoke a listener with a notification.
          *
@@ -63,6 +69,17 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
         void invokeListener(L listener, N notification);
     }
 
+    @FunctionalInterface
+    public interface BatchedInvoker<L, N> {
+        /**
+         * Called to invoke a listener with a notification.
+         *
+         * @param listener the listener to invoke
+         * @param notifications notifications to send
+         */
+        void invokeListener(@Nonnull L listener, @Nonnull Collection<? extends N> notifications);
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
 
     /**
@@ -72,11 +89,20 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
     private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
 
     private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
-    private final Invoker<L, N> listenerInvoker;
+    private final BatchedInvoker<L, N> listenerInvoker;
     private final Executor executor;
     private final String name;
     private final int maxQueueCapacity;
 
+    private QueuedNotificationManager(final Executor executor, final BatchedInvoker<L, N> listenerInvoker,
+            final int maxQueueCapacity, final String name) {
+        Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
+        this.executor = Preconditions.checkNotNull(executor);
+        this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker);
+        this.maxQueueCapacity = maxQueueCapacity;
+        this.name = Preconditions.checkNotNull(name);
+    }
+
     /**
      * Constructor.
      *
@@ -84,14 +110,28 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
      * @param listenerInvoker the {@link Invoker} to use for invoking listeners
      * @param maxQueueCapacity the capacity of each listener queue
      * @param name the name of this instance for logging info
+     *
+     * @deprecated Use {@link #create(Executor, BatchedInvoker, int, String)} instead.
      */
+    @Deprecated
     public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
             final int maxQueueCapacity, final String name) {
-        Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
-        this.executor = Preconditions.checkNotNull(executor);
-        this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker);
-        this.maxQueueCapacity = maxQueueCapacity;
-        this.name = Preconditions.checkNotNull(name);
+        this(executor, (BatchedInvoker<L, N>)(l, c) -> c.forEach(n -> listenerInvoker.invokeListener(l, n)),
+            maxQueueCapacity, name);
+        Preconditions.checkNotNull(listenerInvoker);
+    }
+
+    /**
+     * Create a new notification manager.
+     *
+     * @param executor the {@link Executor} to use for notification tasks
+     * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners
+     * @param maxQueueCapacity the capacity of each listener queue
+     * @param name the name of this instance for logging info
+     */
+    public static <L, N> QueuedNotificationManager<L, N> create(final Executor executor,
+            final BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity, final String name) {
+        return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
     }
 
     /* (non-Javadoc)
@@ -359,7 +399,7 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
 
             LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notification);
             try {
-                listenerInvoker.invokeListener(listenerKey.getListener(), notification);
+                listenerInvoker.invokeListener(listenerKey.getListener(), ImmutableList.of(notification));
             } catch (Exception e) {
                 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
                 LOG.error(String.format("%1$s: Error notifying listener %2$s", name, listenerKey), e);