2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import com.google.common.base.Preconditions;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23 import java.util.stream.Collectors;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * This class manages queuing and dispatching notifications for multiple listeners concurrently.
30 * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
33 * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
34 * task for a listener when there are pending notifications. On the first notification(s), a queue
35 * is created and a task is submitted to the executor to dispatch the queue to the associated
36 * listener. Any subsequent notifications that occur before all previous notifications have been
37 * dispatched are appended to the existing queue. When all notifications have been dispatched, the
38 * queue and task are discarded.
40 * @author Thomas Pantelis
42 * @param <L> the listener type
43 * @param <N> the notification type
45 public class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
48 * Interface implemented by clients that does the work of invoking listeners with notifications.
50 * @author Thomas Pantelis
52 * @param <L> the listener type
53 * @param <N> the notification type
55 public interface Invoker<L, N> {
58 * Called to invoke a listener with a notification.
60 * @param listener the listener to invoke
61 * @param notification the notification to send
63 void invokeListener(L listener, N notification);
66 private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
69 * Caps the maximum number of attempts to offer notification to a particular listener. Each
70 * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
72 private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
74 private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
75 private final Invoker<L, N> listenerInvoker;
76 private final Executor executor;
77 private final String name;
78 private final int maxQueueCapacity;
83 * @param executor the {@link Executor} to use for notification tasks
84 * @param listenerInvoker the {@link Invoker} to use for invoking listeners
85 * @param maxQueueCapacity the capacity of each listener queue
86 * @param name the name of this instance for logging info
88 public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
89 final int maxQueueCapacity, final String name) {
90 Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
91 this.executor = Preconditions.checkNotNull(executor);
92 this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker);
93 this.maxQueueCapacity = maxQueueCapacity;
94 this.name = Preconditions.checkNotNull(name);
98 * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
101 public void submitNotification(final L listener, final N notification) throws RejectedExecutionException {
102 if (notification != null) {
103 submitNotifications(listener, Collections.singletonList(notification));
108 * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
111 public void submitNotifications(final L listener, final Iterable<N> notifications)
112 throws RejectedExecutionException {
114 if (notifications == null || listener == null) {
118 LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
120 final ListenerKey<L> key = new ListenerKey<>(listener);
122 // Keep looping until we are either able to add a new NotificationTask or are able to
123 // add our notifications to an existing NotificationTask. Eventually one or the other
126 NotificationTask newNotificationTask = null;
129 final NotificationTask existingTask = listenerCache.get(key);
130 if (existingTask != null && existingTask.submitNotifications(notifications)) {
131 // We were able to add our notifications to an existing task so we're done.
135 // Either there's no existing task or we couldn't add our notifications to the
136 // existing one because it's in the process of exiting and removing itself from
137 // the cache. Either way try to put a new task in the cache. If we can't put
138 // then either the existing one is still there and hasn't removed itself quite
139 // yet or some other concurrent thread beat us to the put although this method
140 // shouldn't be called concurrently for the same listener as that would violate
141 // notification ordering. In any case loop back up and try again.
143 if (newNotificationTask == null) {
144 newNotificationTask = new NotificationTask(key, notifications);
146 final NotificationTask oldTask = listenerCache.putIfAbsent(key, newNotificationTask);
147 if (oldTask == null) {
148 // We were able to put our new task - now submit it to the executor and
149 // we're done. If it throws a RejectedxecutionException, let that propagate
152 LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
153 executor.execute(newNotificationTask);
157 LOG.debug("{}: retrying task queueing for {}", name, listener);
159 } catch (InterruptedException e) {
160 // We were interrupted trying to offer to the listener's queue. Somebody's probably
161 // telling us to quit.
162 LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
165 LOG.trace("{}: submitNotifications dine for listener {}", name, listener);
169 * Returns {@link ListenerNotificationQueueStats} instances for each current listener
170 * notification task in progress.
172 public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
173 return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
174 t.notificationQueue.size())).collect(Collectors.toList());
178 * Returns the maximum listener queue capacity.
180 public int getMaxQueueCapacity() {
181 return maxQueueCapacity;
185 * Returns the {@link Executor} to used for notification tasks.
187 public Executor getExecutor() {
192 * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
193 * Since we don't know anything about the listener class implementations and we're mixing
194 * multiple listener class instances in the same map, this avoids any potential issue with an
195 * equals implementation that just blindly casts the other Object to compare instead of checking
198 private static final class ListenerKey<L> {
199 private final L listener;
201 ListenerKey(final L listener) {
202 this.listener = Preconditions.checkNotNull(listener);
210 public int hashCode() {
211 return System.identityHashCode(listener);
215 public boolean equals(final Object obj) {
219 return (obj instanceof ListenerKey<?>) && listener == ((ListenerKey<?>) obj).listener;
223 public String toString() {
224 return listener.toString();
229 * Executor task for a single listener that queues notifications and sends them serially to the
232 private class NotificationTask implements Runnable {
233 private final Lock queuingLock = new ReentrantLock();
234 private final BlockingQueue<N> notificationQueue;
235 private final ListenerKey<L> listenerKey;
237 @GuardedBy("queuingLock")
238 private boolean queuedNotifications = false;
239 private volatile boolean done = false;
241 NotificationTask(final ListenerKey<L> listenerKey, final Iterable<N> notifications) {
242 this.listenerKey = Preconditions.checkNotNull(listenerKey);
243 this.notificationQueue = new LinkedBlockingQueue<>(maxQueueCapacity);
245 for (N notification: notifications) {
246 this.notificationQueue.add(notification);
250 @GuardedBy("queuingLock")
251 private void publishNotification(final N notification) throws InterruptedException {
252 // The offer is attempted for up to 10 minutes, with a status message printed each minute
253 for (int notificationOfferAttempts = 0;
254 notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
256 // Try to offer for up to a minute and log a message if it times out.
257 LOG.debug("{}: Offering notification to the queue for listener {}: {}", name, listenerKey,
260 if (notificationQueue.offer(notification, 1, TimeUnit.MINUTES)) {
264 LOG.warn("{}: Timed out trying to offer a notification to the queue for listener {} "
265 + "on attempt {} of {}. The queue has reached its capacity of {}", name, listenerKey,
266 notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity);
269 LOG.warn("{}: Failed to offer a notification to the queue for listener {}. Exceeded max allowable attempts"
270 + " of {} in {} minutes; the listener is likely in an unrecoverable state (deadlock or endless"
271 + " loop).", name, listenerKey, MAX_NOTIFICATION_OFFER_ATTEMPTS, MAX_NOTIFICATION_OFFER_ATTEMPTS);
274 boolean submitNotifications(final Iterable<N> notifications) throws InterruptedException {
279 // Check the done flag - if true then #run is in the process of exiting so return
280 // false to indicate such. Otherwise, offer the notifications to the queue.
286 for (N notification : notifications) {
287 publishNotification(notification);
290 // Set the queuedNotifications flag to tell #run that we've just queued
291 // notifications and not to exit yet, even if it thinks the queue is empty at this
294 queuedNotifications = true;
296 queuingLock.unlock();
305 // Loop until we've dispatched all the notifications in the queue.
308 // Get the notification at the head of the queue, waiting a little bit for one
311 final N notification = notificationQueue.poll(10, TimeUnit.MILLISECONDS);
312 if (notification == null) {
314 // The queue is empty - try to get the queuingLock. If we can't get the lock
315 // then #submitNotifications is in the process of offering to the queue so
316 // we'll loop back up and poll the queue again.
318 if (queuingLock.tryLock()) {
321 // Check the queuedNotifications flag to see if #submitNotifications
322 // has offered new notification(s) to the queue. If so, loop back up
323 // and poll the queue again. Otherwise set done to true and exit.
324 // Once we set the done flag and unlock, calls to
325 // #submitNotifications will fail and a new task will be created.
327 if (!queuedNotifications) {
332 // Clear the queuedNotifications flag so we'll try to exit the next
333 // time through the loop when the queue is empty.
335 queuedNotifications = false;
338 queuingLock.unlock();
343 notifyListener(notification);
345 } catch (InterruptedException e) {
346 // The executor is probably shutting down so log as debug.
347 LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
349 // We're exiting, gracefully or not - either way make sure we always remove
350 // ourselves from the cache.
351 listenerCache.remove(listenerKey, this);
355 private void notifyListener(final N notification) {
356 if (notification == null) {
360 LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notification);
362 listenerInvoker.invokeListener(listenerKey.getListener(), notification);
363 } catch (Exception e) {
364 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
365 LOG.error(String.format("%1$s: Error notifying listener %2$s", name, listenerKey), e);