2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static com.google.common.base.Preconditions.checkArgument;
12 import static java.util.Objects.requireNonNull;
14 import com.google.common.collect.ImmutableList;
15 import java.util.ArrayDeque;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.Iterator;
19 import java.util.List;
20 import java.util.Queue;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import java.util.concurrent.Executor;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.locks.Condition;
27 import java.util.concurrent.locks.Lock;
28 import java.util.concurrent.locks.ReentrantLock;
29 import java.util.stream.Collectors;
30 import javax.annotation.Nonnull;
31 import javax.annotation.concurrent.GuardedBy;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * This class manages queuing and dispatching notifications for multiple listeners concurrently.
37 * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
40 * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
41 * task for a listener when there are pending notifications. On the first notification(s), a queue
42 * is created and a task is submitted to the executor to dispatch the queue to the associated
43 * listener. Any subsequent notifications that occur before all previous notifications have been
44 * dispatched are appended to the existing queue. When all notifications have been dispatched, the
45 * queue and task are discarded.
47 * @author Thomas Pantelis
49 * @param <L> the listener type
50 * @param <N> the notification type
52 public class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
55 * Interface implemented by clients that does the work of invoking listeners with notifications.
57 * @author Thomas Pantelis
59 * @param <L> the listener type
60 * @param <N> the notification type
62 * @deprecated Use {@link QueuedNotificationManager.BatchedInvoker} instead.
66 public interface Invoker<L, N> {
68 * Called to invoke a listener with a notification.
70 * @param listener the listener to invoke
71 * @param notification the notification to send
73 void invokeListener(L listener, N notification);
77 public interface BatchedInvoker<L, N> {
79 * Called to invoke a listener with a notification.
81 * @param listener the listener to invoke
82 * @param notifications notifications to send
84 void invokeListener(@Nonnull L listener, @Nonnull Collection<? extends N> notifications);
87 private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
90 * Caps the maximum number of attempts to offer notification to a particular listener. Each
91 * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
93 private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
94 private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
95 private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
97 private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
98 private final BatchedInvoker<L, N> listenerInvoker;
99 private final Executor executor;
100 private final String name;
101 private final int maxQueueCapacity;
103 private QueuedNotificationManager(final Executor executor, final BatchedInvoker<L, N> listenerInvoker,
104 final int maxQueueCapacity, final String name) {
105 checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
106 this.executor = requireNonNull(executor);
107 this.listenerInvoker = requireNonNull(listenerInvoker);
108 this.maxQueueCapacity = maxQueueCapacity;
109 this.name = requireNonNull(name);
115 * @param executor the {@link Executor} to use for notification tasks
116 * @param listenerInvoker the {@link Invoker} to use for invoking listeners
117 * @param maxQueueCapacity the capacity of each listener queue
118 * @param name the name of this instance for logging info
120 * @deprecated Use {@link #create(Executor, BatchedInvoker, int, String)} instead.
123 @SuppressWarnings("checkstyle:illegalCatch")
124 public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
125 final int maxQueueCapacity, final String name) {
126 this(executor, (BatchedInvoker<L, N>)(listener, notifications) -> notifications.forEach(n -> {
128 listenerInvoker.invokeListener(listener, n);
129 } catch (Exception e) {
130 LOG.error("{}: Error notifying listener {} with {}", name, listener, n, e);
133 }), maxQueueCapacity, name);
134 requireNonNull(listenerInvoker);
138 * Create a new notification manager.
140 * @param executor the {@link Executor} to use for notification tasks
141 * @param listenerInvoker the {@link BatchedInvoker} to use for invoking listeners
142 * @param maxQueueCapacity the capacity of each listener queue
143 * @param name the name of this instance for logging info
145 public static <L, N> QueuedNotificationManager<L, N> create(final Executor executor,
146 final BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity, final String name) {
147 return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
151 * Returns the maximum listener queue capacity.
153 public int getMaxQueueCapacity() {
154 return maxQueueCapacity;
158 * Returns the {@link Executor} to used for notification tasks.
160 public Executor getExecutor() {
165 * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
168 public void submitNotification(final L listener, final N notification) throws RejectedExecutionException {
169 if (notification != null) {
170 submitNotifications(listener, Collections.singletonList(notification));
175 * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
178 public void submitNotifications(final L listener, final Iterable<N> notifications)
179 throws RejectedExecutionException {
181 if (notifications == null || listener == null) {
185 LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
187 final ListenerKey<L> key = new ListenerKey<>(listener);
189 // Keep looping until we are either able to add a new NotificationTask or are able to
190 // add our notifications to an existing NotificationTask. Eventually one or the other
193 Iterator<N> it = notifications.iterator();
196 NotificationTask task = listenerCache.get(key);
198 // No task found, try to insert a new one
199 final NotificationTask newTask = new NotificationTask(key, it);
200 task = listenerCache.putIfAbsent(key, newTask);
202 // We were able to put our new task - now submit it to the executor and
203 // we're done. If it throws a RejectedExecutionException, let that propagate
205 runTask(listener, newTask);
209 // We have a racing task, hence we can continue, but we need to refresh our iterator from
211 it = newTask.recoverItems();
214 final boolean completed = task.submitNotifications(it);
216 // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
217 // than spinning on removal, we try to replace it.
218 final NotificationTask newTask = new NotificationTask(key, it);
219 if (listenerCache.replace(key, task, newTask)) {
220 runTask(listener, newTask);
224 // We failed to replace the task, hence we need retry. Note we have to recover the items to be
225 // published from the new task.
226 it = newTask.recoverItems();
227 LOG.debug("{}: retrying task queueing for {}", name, listener);
231 // All notifications have either been delivered or we have timed out and warned about the ones we
232 // have failed to deliver. In any case we are done here.
235 } catch (InterruptedException e) {
236 // We were interrupted trying to offer to the listener's queue. Somebody's probably
237 // telling us to quit.
238 LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
241 LOG.trace("{}: submitNotifications done for listener {}", name, listener);
245 * Returns {@link ListenerNotificationQueueStats} instances for each current listener
246 * notification task in progress.
248 public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
249 return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
250 t.size())).collect(Collectors.toList());
253 private void runTask(final L listener, final NotificationTask task) {
254 LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
255 executor.execute(task);
259 * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
260 * Since we don't know anything about the listener class implementations and we're mixing
261 * multiple listener class instances in the same map, this avoids any potential issue with an
262 * equals implementation that just blindly casts the other Object to compare instead of checking
265 private static final class ListenerKey<L> {
266 private final L listener;
268 ListenerKey(final L listener) {
269 this.listener = requireNonNull(listener);
277 public int hashCode() {
278 return System.identityHashCode(listener);
282 public boolean equals(final Object obj) {
286 return obj instanceof ListenerKey<?> && listener == ((ListenerKey<?>) obj).listener;
290 public String toString() {
291 return listener.toString();
296 * Executor task for a single listener that queues notifications and sends them serially to the
299 private class NotificationTask implements Runnable {
301 private final Lock lock = new ReentrantLock();
302 private final Condition notEmpty = lock.newCondition();
303 private final Condition notFull = lock.newCondition();
304 private final ListenerKey<L> listenerKey;
307 private final Queue<N> queue = new ArrayDeque<>();
309 private boolean exiting;
311 NotificationTask(final ListenerKey<L> listenerKey, final Iterator<N> notifications) {
312 this.listenerKey = requireNonNull(listenerKey);
313 while (notifications.hasNext()) {
314 queue.offer(notifications.next());
318 Iterator<N> recoverItems() {
319 // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
320 // get started, hence this is safe.
321 return queue.iterator();
333 boolean submitNotifications(final Iterator<N> notifications) throws InterruptedException {
334 final long start = System.nanoTime();
335 final long deadline = start + GIVE_UP_NANOS;
339 // Lock may have blocked for some time, we need to take that into account. We may have exceedded
340 // the deadline, but that is unlikely and even in that case we can make some progress without further
342 long canWait = deadline - System.nanoTime();
345 // Check the exiting flag - if true then #run is in the process of exiting so return
346 // false to indicate such. Otherwise, offer the notifications to the queue.
351 final int avail = maxQueueCapacity - queue.size();
354 LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
355 + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
356 + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
357 listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
361 canWait = notFull.awaitNanos(canWait);
365 for (int i = 0; i < avail; ++i) {
366 if (!notifications.hasNext()) {
371 queue.offer(notifications.next());
380 private boolean waitForQueue() {
381 long timeout = TASK_WAIT_NANOS;
383 while (queue.isEmpty()) {
389 timeout = notEmpty.awaitNanos(timeout);
390 } catch (InterruptedException e) {
391 // The executor is probably shutting down so log as debug.
392 LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
403 // Loop until we've dispatched all the notifications in the queue.
405 final Collection<N> notifications;
409 if (!waitForQueue()) {
414 // Splice the entire queue
415 notifications = ImmutableList.copyOf(queue);
423 invokeListener(notifications);
426 // We're exiting, gracefully or not - either way make sure we always remove
427 // ourselves from the cache.
428 listenerCache.remove(listenerKey, this);
432 @SuppressWarnings("checkstyle:illegalCatch")
433 private void invokeListener(final Collection<N> notifications) {
434 LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
436 listenerInvoker.invokeListener(listenerKey.getListener(), notifications);
437 } catch (Exception e) {
438 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
439 LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);