2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.collect.ImmutableList;
13 import java.util.List;
14 import java.util.concurrent.Executor;
15 import java.util.stream.Collectors;
16 import org.eclipse.jdt.annotation.NonNull;
17 import org.eclipse.jdt.annotation.NonNullByDefault;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
22 * This class manages queuing and dispatching notifications for multiple listeners concurrently.
23 * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
26 * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
27 * task for a listener when there are pending notifications. On the first notification(s), a queue
28 * is created and a task is submitted to the executor to dispatch the queue to the associated
29 * listener. Any subsequent notifications that occur before all previous notifications have been
30 * dispatched are appended to the existing queue. When all notifications have been dispatched, the
31 * queue and task are discarded.
33 * @author Thomas Pantelis
35 * @param <L> the listener type
36 * @param <N> the notification type
39 abstract class AbstractQueuedNotificationManager<T, L, N> extends AbstractBatchingExecutor<T, N>
40 implements NotificationManager<L, N> {
42 private final QueuedNotificationManagerMXBean mxBean = new QueuedNotificationManagerMXBeanImpl(this);
43 private final BatchedInvoker<L, N> listenerInvoker;
45 AbstractQueuedNotificationManager(final String name, final Executor executor, final int maxQueueCapacity,
46 final BatchedInvoker<L, N> listenerInvoker) {
47 super(name, executor, maxQueueCapacity);
48 this.listenerInvoker = requireNonNull(listenerInvoker);
52 * Returns the {@link Executor} to used for notification tasks.
54 public final Executor getExecutor() {
59 * Returns the maximum listener queue capacity.
61 public final int getMaxQueueCapacity() {
62 return maxQueueCapacity();
66 * Return an {@link QueuedNotificationManagerMXBean} tied to this instance.
68 * @return An QueuedNotificationManagerMXBean object.
70 public final QueuedNotificationManagerMXBean getMXBean() {
75 * Returns {@link ListenerNotificationQueueStats} instances for each current listener
76 * notification task in progress.
78 // FIXME: drop visibility to package-protected
79 public final List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
80 return streamTasks().map(t -> new ListenerNotificationQueueStats(t.key().toString(), t.size()))
81 .collect(Collectors.toList());
85 public final void submitNotification(final L listener, final N notification) {
86 if (listener != null && notification != null) {
87 submitTask(wrap(listener), notification);
92 public final void submitNotifications(final L listener, final @Nullable Iterable<N> notifications) {
93 if (listener != null && notifications != null) {
94 submitTasks(wrap(listener), notifications);
99 final void executeBatch(final T key, final @NonNull ImmutableList<N> tasks) {
100 listenerInvoker.invokeListener(unwrap(key), tasks);
103 abstract T wrap(L listener);
105 abstract L unwrap(T key);