package org.opendaylight.yangtools.util.concurrent;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*
* @param <L> the listener type
* @param <N> the notification type
+ *
+ * @deprecated Use {@link QueuedNotificationManager.BatchedInvoker} instead.
*/
+ @Deprecated
+ @FunctionalInterface
public interface Invoker<L, N> {
-
/**
* Called to invoke a listener with a notification.
*
void invokeListener(L listener, N notification);
}
+ @FunctionalInterface
+ public interface BatchedInvoker<L, N> {
+ /**
+ * Called to invoke a listener with a notification.
+ *
+ * @param listener the listener to invoke
+ * @param notifications notifications to send
+ */
+ void invokeListener(@Nonnull L listener, @Nonnull Collection<? extends N> notifications);
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
/**
private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
- private final Invoker<L, N> listenerInvoker;
+ private final BatchedInvoker<L, N> listenerInvoker;
private final Executor executor;
private final String name;
private final int maxQueueCapacity;
+ private QueuedNotificationManager(final Executor executor, final BatchedInvoker<L, N> listenerInvoker,
+ final int maxQueueCapacity, final String name) {
+ Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
+ this.executor = Preconditions.checkNotNull(executor);
+ this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker);
+ this.maxQueueCapacity = maxQueueCapacity;
+ this.name = Preconditions.checkNotNull(name);
+ }
+
/**
* Constructor.
*
* @param listenerInvoker the {@link Invoker} to use for invoking listeners
* @param maxQueueCapacity the capacity of each listener queue
* @param name the name of this instance for logging info
+ *
+ * @deprecated Use {@link #create(Executor, BatchedInvoker, int, String)} instead.
*/
+ @Deprecated
public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
final int maxQueueCapacity, final String name) {
- Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
- this.executor = Preconditions.checkNotNull(executor);
- this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker);
- this.maxQueueCapacity = maxQueueCapacity;
- this.name = Preconditions.checkNotNull(name);
+ this(executor, (BatchedInvoker<L, N>)(l, c) -> c.forEach(n -> listenerInvoker.invokeListener(l, n)),
+ maxQueueCapacity, name);
+ Preconditions.checkNotNull(listenerInvoker);
+ }
+
+ /**
+ * Create a new notification manager.
+ *
+ * @param executor the {@link Executor} to use for notification tasks
+ * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners
+ * @param maxQueueCapacity the capacity of each listener queue
+ * @param name the name of this instance for logging info
+ */
+ public static <L, N> QueuedNotificationManager<L, N> create(final Executor executor,
+ final BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity, final String name) {
+ return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
}
/* (non-Javadoc)
LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notification);
try {
- listenerInvoker.invokeListener(listenerKey.getListener(), notification);
+ listenerInvoker.invokeListener(listenerKey.getListener(), ImmutableList.of(notification));
} catch (Exception e) {
// We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
LOG.error(String.format("%1$s: Error notifying listener %2$s", name, listenerKey), e);