From 21782e414bc65f82975f62fb560c12f23c5d2acb Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 21 Sep 2016 00:57:00 +0200 Subject: [PATCH] Add batching to QueuedNotificationManager Introduce BatchedInvoker, which allows multiple events to be delivered in one go. Change-Id: I3d4d054543fa29842d67f6f21f28ff0a47ed135d Signed-off-by: Robert Varga (cherry picked from commit 8aaf251e7eb6616384794e31bcd107ffe8fdba2e) --- .../concurrent/QueuedNotificationManager.java | 56 ++++++++++++++++--- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java index c0fb5602fa..21a63ca253 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java @@ -9,6 +9,8 @@ package org.opendaylight.yangtools.util.concurrent; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -21,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; +import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,9 +54,12 @@ public class QueuedNotificationManager implements NotificationManager the listener type * @param the notification type + * + * @deprecated Use {@link QueuedNotificationManager.BatchedInvoker} instead. */ + @Deprecated + @FunctionalInterface public interface Invoker { - /** * Called to invoke a listener with a notification. * @@ -63,6 +69,17 @@ public class QueuedNotificationManager implements NotificationManager { + /** + * Called to invoke a listener with a notification. + * + * @param listener the listener to invoke + * @param notifications notifications to send + */ + void invokeListener(@Nonnull L listener, @Nonnull Collection notifications); + } + private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class); /** @@ -72,11 +89,20 @@ public class QueuedNotificationManager implements NotificationManager, NotificationTask> listenerCache = new ConcurrentHashMap<>(); - private final Invoker listenerInvoker; + private final BatchedInvoker listenerInvoker; private final Executor executor; private final String name; private final int maxQueueCapacity; + private QueuedNotificationManager(final Executor executor, final BatchedInvoker listenerInvoker, + final int maxQueueCapacity, final String name) { + Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity); + this.executor = Preconditions.checkNotNull(executor); + this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker); + this.maxQueueCapacity = maxQueueCapacity; + this.name = Preconditions.checkNotNull(name); + } + /** * Constructor. * @@ -84,14 +110,28 @@ public class QueuedNotificationManager implements NotificationManager listenerInvoker, final int maxQueueCapacity, final String name) { - Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity); - this.executor = Preconditions.checkNotNull(executor); - this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker); - this.maxQueueCapacity = maxQueueCapacity; - this.name = Preconditions.checkNotNull(name); + this(executor, (BatchedInvoker)(l, c) -> c.forEach(n -> listenerInvoker.invokeListener(l, n)), + maxQueueCapacity, name); + Preconditions.checkNotNull(listenerInvoker); + } + + /** + * Create a new notification manager. + * + * @param executor the {@link Executor} to use for notification tasks + * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners + * @param maxQueueCapacity the capacity of each listener queue + * @param name the name of this instance for logging info + */ + public static QueuedNotificationManager create(final Executor executor, + final BatchedInvoker listenerInvoker, final int maxQueueCapacity, final String name) { + return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name); } /* (non-Javadoc) @@ -359,7 +399,7 @@ public class QueuedNotificationManager implements NotificationManager