2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.collect.ImmutableList;
14 import java.util.ArrayDeque;
15 import java.util.Collections;
16 import java.util.Iterator;
17 import java.util.List;
18 import java.util.Queue;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.Executor;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.locks.Condition;
24 import java.util.concurrent.locks.Lock;
25 import java.util.concurrent.locks.ReentrantLock;
26 import java.util.stream.Collectors;
27 import org.checkerframework.checker.lock.qual.GuardedBy;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.yangtools.util.ForwardingIdentityObject;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * This class manages queuing and dispatching notifications for multiple listeners concurrently.
35 * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
38 * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
39 * task for a listener when there are pending notifications. On the first notification(s), a queue
40 * is created and a task is submitted to the executor to dispatch the queue to the associated
41 * listener. Any subsequent notifications that occur before all previous notifications have been
42 * dispatched are appended to the existing queue. When all notifications have been dispatched, the
43 * queue and task are discarded.
45 * @author Thomas Pantelis
47 * @param <L> the listener type
48 * @param <N> the notification type
50 public final class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
52 public interface BatchedInvoker<L, N> {
54 * Called to invoke a listener with a notification.
56 * @param listener the listener to invoke
57 * @param notifications notifications to send
59 void invokeListener(@NonNull L listener, @NonNull ImmutableList<N> notifications);
62 private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
65 * Caps the maximum number of attempts to offer notification to a particular listener. Each
66 * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
68 private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
69 private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
70 private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
73 * We key by listener reference identity hashCode/equals.
74 * Since we don't know anything about the listener class implementations and we're mixing
75 * multiple listener class instances in the same map, this avoids any potential issue with an
76 * equals implementation that just blindly casts the other Object to compare instead of checking
79 private final ConcurrentMap<ForwardingIdentityObject<L>, NotificationTask> listenerCache =
80 new ConcurrentHashMap<>();
81 private final @NonNull QueuedNotificationManagerMXBean mxBean = new QueuedNotificationManagerMXBeanImpl(this);
82 private final @NonNull BatchedInvoker<L, N> listenerInvoker;
83 private final @NonNull Executor executor;
84 private final @NonNull String name;
85 private final int maxQueueCapacity;
87 private QueuedNotificationManager(final @NonNull Executor executor,
88 final @NonNull BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
89 final @NonNull String name) {
90 checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
91 this.executor = requireNonNull(executor);
92 this.listenerInvoker = requireNonNull(listenerInvoker);
93 this.maxQueueCapacity = maxQueueCapacity;
94 this.name = requireNonNull(name);
98 * Create a new notification manager.
100 * @param executor the {@link Executor} to use for notification tasks
101 * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners
102 * @param maxQueueCapacity the capacity of each listener queue
103 * @param name the name of this instance for logging info
105 public static <L, N> QueuedNotificationManager<L, N> create(final @NonNull Executor executor,
106 final@NonNull BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
107 final @NonNull String name) {
108 return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
112 * Returns the maximum listener queue capacity.
114 public int getMaxQueueCapacity() {
115 return maxQueueCapacity;
119 * Return an {@link QueuedNotificationManagerMXBean} tied to this instance.
121 * @return An QueuedNotificationManagerMXBean object.
123 public @NonNull QueuedNotificationManagerMXBean getMXBean() {
128 * Returns the {@link Executor} to used for notification tasks.
130 public @NonNull Executor getExecutor() {
135 * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
138 public void submitNotification(final L listener, final N notification) {
139 if (notification != null) {
140 submitNotifications(listener, Collections.singletonList(notification));
145 * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
148 public void submitNotifications(final L listener, final Iterable<N> notifications) {
150 if (notifications == null || listener == null) {
154 LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
156 final ForwardingIdentityObject<L> key = ForwardingIdentityObject.of(listener);
158 // Keep looping until we are either able to add a new NotificationTask or are able to
159 // add our notifications to an existing NotificationTask. Eventually one or the other
162 Iterator<N> it = notifications.iterator();
165 NotificationTask task = listenerCache.get(key);
167 // No task found, try to insert a new one
168 final NotificationTask newTask = new NotificationTask(key, it);
169 task = listenerCache.putIfAbsent(key, newTask);
171 // We were able to put our new task - now submit it to the executor and
172 // we're done. If it throws a RejectedExecutionException, let that propagate
174 runTask(listener, newTask);
178 // We have a racing task, hence we can continue, but we need to refresh our iterator from
180 it = newTask.recoverItems();
183 final boolean completed = task.submitNotifications(it);
185 // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
186 // than spinning on removal, we try to replace it.
187 final NotificationTask newTask = new NotificationTask(key, it);
188 if (listenerCache.replace(key, task, newTask)) {
189 runTask(listener, newTask);
193 // We failed to replace the task, hence we need retry. Note we have to recover the items to be
194 // published from the new task.
195 it = newTask.recoverItems();
196 LOG.debug("{}: retrying task queueing for {}", name, listener);
200 // All notifications have either been delivered or we have timed out and warned about the ones we
201 // have failed to deliver. In any case we are done here.
204 } catch (InterruptedException e) {
205 // We were interrupted trying to offer to the listener's queue. Somebody's probably
206 // telling us to quit.
207 LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
210 LOG.trace("{}: submitNotifications done for listener {}", name, listener);
214 * Returns {@link ListenerNotificationQueueStats} instances for each current listener
215 * notification task in progress.
217 public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
218 return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
219 t.size())).collect(Collectors.toList());
222 private void runTask(final L listener, final NotificationTask task) {
223 LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
224 executor.execute(task);
228 * Executor task for a single listener that queues notifications and sends them serially to the
231 private class NotificationTask implements Runnable {
232 private final Lock lock = new ReentrantLock();
233 private final Condition notEmpty = lock.newCondition();
234 private final Condition notFull = lock.newCondition();
235 private final @NonNull ForwardingIdentityObject<L> listenerKey;
238 private final Queue<N> queue = new ArrayDeque<>();
240 private boolean exiting;
242 NotificationTask(final @NonNull ForwardingIdentityObject<L> listenerKey,
243 final @NonNull Iterator<N> notifications) {
244 this.listenerKey = requireNonNull(listenerKey);
245 while (notifications.hasNext()) {
246 queue.add(notifications.next());
250 @NonNull Iterator<N> recoverItems() {
251 // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
252 // get started, hence this is safe.
253 return queue.iterator();
265 boolean submitNotifications(final @NonNull Iterator<N> notifications) throws InterruptedException {
266 final long start = System.nanoTime();
267 final long deadline = start + GIVE_UP_NANOS;
271 // Lock may have blocked for some time, we need to take that into account. We may have exceedded
272 // the deadline, but that is unlikely and even in that case we can make some progress without further
274 long canWait = deadline - System.nanoTime();
277 // Check the exiting flag - if true then #run is in the process of exiting so return
278 // false to indicate such. Otherwise, offer the notifications to the queue.
283 final int avail = maxQueueCapacity - queue.size();
286 LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
287 + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
288 + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
289 listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
293 canWait = notFull.awaitNanos(canWait);
297 for (int i = 0; i < avail; ++i) {
298 if (!notifications.hasNext()) {
303 queue.add(notifications.next());
312 private boolean waitForQueue() {
313 long timeout = TASK_WAIT_NANOS;
315 while (queue.isEmpty()) {
321 timeout = notEmpty.awaitNanos(timeout);
322 } catch (InterruptedException e) {
323 // The executor is probably shutting down so log as debug.
324 LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
335 // Loop until we've dispatched all the notifications in the queue.
337 final @NonNull ImmutableList<N> notifications;
341 if (!waitForQueue()) {
346 // Splice the entire queue
347 notifications = ImmutableList.copyOf(queue);
355 invokeListener(notifications);
358 // We're exiting, gracefully or not - either way make sure we always remove
359 // ourselves from the cache.
360 listenerCache.remove(listenerKey, this);
364 @SuppressWarnings("checkstyle:illegalCatch")
365 private void invokeListener(final @NonNull ImmutableList<N> notifications) {
366 LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
368 listenerInvoker.invokeListener(listenerKey.getDelegate(), notifications);
369 } catch (Exception e) {
370 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
371 LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);