Separate out {Identity,Equality}QueuedNotificationManager
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / AbstractQueuedNotificationManager.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.ImmutableList;
13 import java.util.List;
14 import java.util.concurrent.Executor;
15 import java.util.stream.Collectors;
16 import org.eclipse.jdt.annotation.NonNull;
17 import org.eclipse.jdt.annotation.NonNullByDefault;
18 import org.eclipse.jdt.annotation.Nullable;
19 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
20
21 /**
22  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
23  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
24  * {@link Executor}.
25  *
26  * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
27  * task for a listener when there are pending notifications. On the first notification(s), a queue
28  * is created and a task is submitted to the executor to dispatch the queue to the associated
29  * listener. Any subsequent notifications that occur before all previous notifications have been
30  * dispatched are appended to the existing queue. When all notifications have been dispatched, the
31  * queue and task are discarded.
32  *
33  * @author Thomas Pantelis
34  *
35  * @param <L> the listener type
36  * @param <N> the notification type
37  */
38 @NonNullByDefault
39 abstract class AbstractQueuedNotificationManager<T, L, N> extends AbstractBatchingExecutor<T, N>
40         implements NotificationManager<L, N> {
41
42     private final QueuedNotificationManagerMXBean mxBean = new QueuedNotificationManagerMXBeanImpl(this);
43     private final BatchedInvoker<L, N> listenerInvoker;
44
45     AbstractQueuedNotificationManager(final String name, final Executor executor, final int maxQueueCapacity,
46             final BatchedInvoker<L, N> listenerInvoker) {
47         super(name, executor, maxQueueCapacity);
48         this.listenerInvoker = requireNonNull(listenerInvoker);
49     }
50
51     /**
52      * Returns the {@link Executor} to used for notification tasks.
53      */
54     public final Executor getExecutor() {
55         return executor();
56     }
57
58     /**
59      * Returns the maximum listener queue capacity.
60      */
61     public final int getMaxQueueCapacity() {
62         return maxQueueCapacity();
63     }
64
65     /**
66      * Return an {@link QueuedNotificationManagerMXBean} tied to this instance.
67      *
68      * @return An QueuedNotificationManagerMXBean object.
69      */
70     public final QueuedNotificationManagerMXBean getMXBean() {
71         return mxBean;
72     }
73
74     /**
75      * Returns {@link ListenerNotificationQueueStats} instances for each current listener
76      * notification task in progress.
77      */
78     // FIXME: drop visibility to package-protected
79     public final List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
80         return streamTasks().map(t -> new ListenerNotificationQueueStats(t.key().toString(), t.size()))
81                 .collect(Collectors.toList());
82     }
83
84     @Override
85     public final void submitNotification(final L listener, final N notification) {
86         if (listener != null && notification != null) {
87             submitTask(wrap(listener), notification);
88         }
89     }
90
91     @Override
92     public final void submitNotifications(final L listener, final @Nullable Iterable<N> notifications) {
93         if (listener != null && notifications != null) {
94             submitTasks(wrap(listener), notifications);
95         }
96     }
97
98     @Override
99     final void executeBatch(final T key, final @NonNull ImmutableList<N> tasks) {
100         listenerInvoker.invokeListener(unwrap(key), tasks);
101     }
102
103     abstract T wrap(L listener);
104
105     abstract L unwrap(T key);
106 }