2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import com.google.common.base.Preconditions;
12 import java.util.ArrayList;
13 import java.util.Collections;
14 import java.util.List;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.Executor;
19 import java.util.concurrent.LinkedBlockingQueue;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.locks.Lock;
23 import java.util.concurrent.locks.ReentrantLock;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * This class manages queuing and dispatching notifications for multiple listeners concurrently.
30 * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
33 * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
34 * task for a listener when there are pending notifications. On the first notification(s), a queue
35 * is created and a task is submitted to the executor to dispatch the queue to the associated
36 * listener. Any subsequent notifications that occur before all previous notifications have been
37 * dispatched are appended to the existing queue. When all notifications have been dispatched, the
38 * queue and task are discarded.
40 * @author Thomas Pantelis
42 * @param <L> the listener type
43 * @param <N> the notification type
45 public class QueuedNotificationManager<L,N> implements NotificationManager<L,N> {
48 * Interface implemented by clients that does the work of invoking listeners with notifications.
50 * @author Thomas Pantelis
52 * @param <L> the listener type
53 * @param <N> the notification type
55 public interface Invoker<L,N> {
58 * Called to invoke a listener with a notification.
60 * @param listener the listener to invoke
61 * @param notification the notification to send
63 void invokeListener( L listener, N notification );
66 private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
69 * Caps the maximum number of attempts to offer notification to a particular listener. Each
70 * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
72 private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
74 private final Executor executor;
75 private final Invoker<L,N> listenerInvoker;
77 private final ConcurrentMap<ListenerKey<L>,NotificationTask>
78 listenerCache = new ConcurrentHashMap<>();
80 private final String name;
81 private final int maxQueueCapacity;
86 * @param executor the {@link Executor} to use for notification tasks
87 * @param listenerInvoker the {@link Invoker} to use for invoking listeners
88 * @param maxQueueCapacity the capacity of each listener queue
89 * @param name the name of this instance for logging info
91 public QueuedNotificationManager( Executor executor, Invoker<L,N> listenerInvoker,
92 int maxQueueCapacity, String name ) {
93 this.executor = Preconditions.checkNotNull( executor );
94 this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker );
95 Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " );
96 this.maxQueueCapacity = maxQueueCapacity;
97 this.name = Preconditions.checkNotNull( name );
101 * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
104 public void submitNotification( final L listener, final N notification )
105 throws RejectedExecutionException {
107 if (notification == null) {
111 submitNotifications( listener, Collections.singletonList(notification));
115 * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
118 public void submitNotifications( final L listener, final Iterable<N> notifications )
119 throws RejectedExecutionException {
121 if (notifications == null || listener == null) {
125 if (LOG.isTraceEnabled()) {
126 LOG.trace( "{}: submitNotifications for listener {}: {}",
127 name, listener.toString(), notifications );
130 ListenerKey<L> key = new ListenerKey<>( listener );
131 NotificationTask newNotificationTask = null;
133 // Keep looping until we are either able to add a new NotificationTask or are able to
134 // add our notifications to an existing NotificationTask. Eventually one or the other
139 NotificationTask existingTask = listenerCache.get( key );
141 if (existingTask == null || !existingTask.submitNotifications( notifications )) {
143 // Either there's no existing task or we couldn't add our notifications to the
144 // existing one because it's in the process of exiting and removing itself from
145 // the cache. Either way try to put a new task in the cache. If we can't put
146 // then either the existing one is still there and hasn't removed itself quite
147 // yet or some other concurrent thread beat us to the put although this method
148 // shouldn't be called concurrently for the same listener as that would violate
149 // notification ordering. In any case loop back up and try again.
151 if (newNotificationTask == null) {
152 newNotificationTask = new NotificationTask( key, notifications );
155 existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
156 if (existingTask == null) {
158 // We were able to put our new task - now submit it to the executor and
159 // we're done. If it throws a RejectedxecutionException, let that propagate
162 LOG.debug( "{}: Submitting NotificationTask for listener {}",
163 name, listener.toString() );
165 executor.execute( newNotificationTask );
170 // We were able to add our notifications to an existing task so we're done.
175 } catch (InterruptedException e) {
177 // We were interrupted trying to offer to the listener's queue. Somebody's probably
178 // telling us to quit.
180 LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
181 name, listener.toString() );
184 if (LOG.isTraceEnabled()) {
185 LOG.trace( "{}: submitNotifications dine for listener {}",
186 name, listener.toString() );
191 * Returns {@link ListenerNotificationQueueStats} instances for each current listener
192 * notification task in progress.
194 public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
195 List<ListenerNotificationQueueStats> statsList = new ArrayList<>( listenerCache.size() );
196 for (NotificationTask task: listenerCache.values()) {
197 statsList.add( new ListenerNotificationQueueStats(
198 task.listenerKey.toString(), task.notificationQueue.size() ) );
205 * Returns the maximum listener queue capacity.
207 public int getMaxQueueCapacity() {
208 return maxQueueCapacity;
212 * Returns the {@link Executor} to used for notification tasks.
214 public Executor getExecutor() {
219 * Used as the listenerCache map key. We key by listener reference identity hashCode/equals.
220 * Since we don't know anything about the listener class implementations and we're mixing
221 * multiple listener class instances in the same map, this avoids any potential issue with an
222 * equals implementation that just blindly casts the other Object to compare instead of checking
225 private static class ListenerKey<L> {
227 private final L listener;
229 ListenerKey( L listener ) {
230 this.listener = listener;
238 public int hashCode() {
239 return System.identityHashCode( listener );
243 public boolean equals( Object obj ) {
247 if (!(obj instanceof ListenerKey<?>)) {
251 ListenerKey<?> other = (ListenerKey<?>) obj;
252 return listener == other.listener;
256 public String toString() {
257 return listener.toString();
262 * Executor task for a single listener that queues notifications and sends them serially to the
265 private class NotificationTask implements Runnable {
267 private final BlockingQueue<N> notificationQueue;
269 private volatile boolean done = false;
271 @GuardedBy("queuingLock")
272 private boolean queuedNotifications = false;
274 private final Lock queuingLock = new ReentrantLock();
276 private final ListenerKey<L> listenerKey;
278 NotificationTask( ListenerKey<L> listenerKey, Iterable<N> notifications ) {
280 this.listenerKey = listenerKey;
281 this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
283 for (N notification: notifications) {
284 this.notificationQueue.add( notification );
288 boolean submitNotifications( Iterable<N> notifications ) throws InterruptedException {
293 // Check the done flag - if true then #run is in the process of exiting so return
294 // false to indicate such. Otherwise, offer the notifications to the queue.
300 for (N notification: notifications) {
301 boolean notificationOfferAttemptSuccess = false;
302 // The offer is attempted for up to 10 minutes, with a status message printed each minute
303 for (int notificationOfferAttempts = 0;
304 notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
306 // Try to offer for up to a minute and log a message if it times out.
307 if (LOG.isDebugEnabled()) {
308 LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
309 name, listenerKey.toString(), notification );
312 if (notificationOfferAttemptSuccess = notificationQueue.offer(
313 notification, 1, TimeUnit.MINUTES)) {
318 "{}: Timed out trying to offer a notification to the queue for listener {} "
319 + "on attempt {} of {}. " + "The queue has reached its capacity of {}",
320 name, listenerKey.toString(), notificationOfferAttempts,
321 MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity);
323 if (!notificationOfferAttemptSuccess) {
325 "{}: Failed to offer a notification to the queue for listener {}. "
326 + "Exceeded max allowable attempts of {} in {} minutes; the listener "
327 + "is likely in an unrecoverable state (deadlock or endless loop).",
328 name, listenerKey.toString(), MAX_NOTIFICATION_OFFER_ATTEMPTS,
329 MAX_NOTIFICATION_OFFER_ATTEMPTS);
333 // Set the queuedNotifications flag to tell #run that we've just queued
334 // notifications and not to exit yet, even if it thinks the queue is empty at this
337 queuedNotifications = true;
340 queuingLock.unlock();
350 // Loop until we've dispatched all the notifications in the queue.
354 // Get the notification at the head of the queue, waiting a little bit for one
357 N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
358 if (notification == null) {
360 // The queue is empty - try to get the queuingLock. If we can't get the lock
361 // then #submitNotifications is in the process of offering to the queue so
362 // we'll loop back up and poll the queue again.
364 if (queuingLock.tryLock()) {
367 // Check the queuedNotifications flag to see if #submitNotifications
368 // has offered new notification(s) to the queue. If so, loop back up
369 // and poll the queue again. Otherwise set done to true and exit.
370 // Once we set the done flag and unlock, calls to
371 // #submitNotifications will fail and a new task will be created.
373 if (!queuedNotifications) {
378 // Clear the queuedNotifications flag so we'll try to exit the next
379 // time through the loop when the queue is empty.
381 queuedNotifications = false;
384 queuingLock.unlock();
389 notifyListener( notification );
391 } catch (InterruptedException e) {
393 // The executor is probably shutting down so log as debug.
394 LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
395 name, listenerKey.toString() );
398 // We're exiting, gracefully or not - either way make sure we always remove
399 // ourselves from the cache.
401 listenerCache.remove( listenerKey );
405 private void notifyListener( N notification ) {
407 if (notification == null) {
413 if (LOG.isDebugEnabled()) {
414 LOG.debug( "{}: Invoking listener {} with notification: {}",
415 name, listenerKey.toString(), notification );
418 listenerInvoker.invokeListener( listenerKey.getListener(), notification );
420 } catch (RuntimeException e ) {
422 // We'll let a RuntimeException from the listener slide and keep sending any
423 // remaining notifications.
425 LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
426 listenerKey.toString() ), e );
430 // A JVM Error is severe - best practice is to throw them up the chain. Set done to
431 // true so no new notifications can be added to this task as we're about to bail.