From 8fb1f4dda6d063c1a3b11139ec21e1d5ee880075 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Tue, 20 Sep 2016 23:59:32 +0200 Subject: [PATCH] Cleanup QueuedNotificationManager Audit code for coding style and unnecessary checks. Change-Id: Ifce7d4fa4febb7f63ce9014e350e69143508e5c7 Signed-off-by: Robert Varga (cherry picked from commit a0d690e3b716c7574c8fa39b3291220a42a7bdd0) --- .../util/concurrent/NotificationManager.java | 4 +- .../concurrent/QueuedNotificationManager.java | 265 +++++++----------- 2 files changed, 100 insertions(+), 169 deletions(-) diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java index a184ec6790..531adfcb81 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java @@ -29,7 +29,7 @@ public interface NotificationManager { * @param notification the notification to dispatch * @throws RejectedExecutionException if the notification can't be queued for dispatching */ - void submitNotification( L listener, N notification ); + void submitNotification(L listener, N notification); /** * Submits notifications to be queued and dispatched to the given listener. @@ -40,6 +40,6 @@ public interface NotificationManager { * @param notifications the notifications to dispatch * @throws RejectedExecutionException if a notification can't be queued for dispatching */ - void submitNotifications( final L listener, Iterable notifications); + void submitNotifications(L listener, Iterable notifications); } diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java index 90783f2cdc..c0fb5602fa 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java @@ -9,7 +9,6 @@ package org.opendaylight.yangtools.util.concurrent; import com.google.common.base.Preconditions; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -21,6 +20,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import javax.annotation.concurrent.GuardedBy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory; * @param the listener type * @param the notification type */ -public class QueuedNotificationManager implements NotificationManager { +public class QueuedNotificationManager implements NotificationManager { /** * Interface implemented by clients that does the work of invoking listeners with notifications. @@ -52,7 +52,7 @@ public class QueuedNotificationManager implements NotificationManager * @param the listener type * @param the notification type */ - public interface Invoker { + public interface Invoker { /** * Called to invoke a listener with a notification. @@ -60,10 +60,10 @@ public class QueuedNotificationManager implements NotificationManager * @param listener the listener to invoke * @param notification the notification to send */ - void invokeListener( L listener, N notification ); + void invokeListener(L listener, N notification); } - private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class ); + private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class); /** * Caps the maximum number of attempts to offer notification to a particular listener. Each @@ -71,12 +71,9 @@ public class QueuedNotificationManager implements NotificationManager */ private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10; + private final ConcurrentMap, NotificationTask> listenerCache = new ConcurrentHashMap<>(); + private final Invoker listenerInvoker; private final Executor executor; - private final Invoker listenerInvoker; - - private final ConcurrentMap,NotificationTask> - listenerCache = new ConcurrentHashMap<>(); - private final String name; private final int maxQueueCapacity; @@ -88,103 +85,84 @@ public class QueuedNotificationManager implements NotificationManager * @param maxQueueCapacity the capacity of each listener queue * @param name the name of this instance for logging info */ - public QueuedNotificationManager( Executor executor, Invoker listenerInvoker, - int maxQueueCapacity, String name ) { - this.executor = Preconditions.checkNotNull( executor ); - this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker ); - Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " ); + public QueuedNotificationManager(final Executor executor, final Invoker listenerInvoker, + final int maxQueueCapacity, final String name) { + Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity); + this.executor = Preconditions.checkNotNull(executor); + this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker); this.maxQueueCapacity = maxQueueCapacity; - this.name = Preconditions.checkNotNull( name ); + this.name = Preconditions.checkNotNull(name); } /* (non-Javadoc) * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N) */ @Override - public void submitNotification( final L listener, final N notification ) - throws RejectedExecutionException { - - if (notification == null) { - return; + public void submitNotification(final L listener, final N notification) throws RejectedExecutionException { + if (notification != null) { + submitNotifications(listener, Collections.singletonList(notification)); } - - submitNotifications( listener, Collections.singletonList(notification)); } /* (non-Javadoc) * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection) */ @Override - public void submitNotifications( final L listener, final Iterable notifications ) + public void submitNotifications(final L listener, final Iterable notifications) throws RejectedExecutionException { if (notifications == null || listener == null) { return; } - if (LOG.isTraceEnabled()) { - LOG.trace( "{}: submitNotifications for listener {}: {}", - name, listener.toString(), notifications ); - } + LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications); - ListenerKey key = new ListenerKey<>( listener ); - NotificationTask newNotificationTask = null; + final ListenerKey key = new ListenerKey<>(listener); // Keep looping until we are either able to add a new NotificationTask or are able to // add our notifications to an existing NotificationTask. Eventually one or the other // will occur. - try { - while (true) { - NotificationTask existingTask = listenerCache.get( key ); - - if (existingTask == null || !existingTask.submitNotifications( notifications )) { - - // Either there's no existing task or we couldn't add our notifications to the - // existing one because it's in the process of exiting and removing itself from - // the cache. Either way try to put a new task in the cache. If we can't put - // then either the existing one is still there and hasn't removed itself quite - // yet or some other concurrent thread beat us to the put although this method - // shouldn't be called concurrently for the same listener as that would violate - // notification ordering. In any case loop back up and try again. - - if (newNotificationTask == null) { - newNotificationTask = new NotificationTask( key, notifications ); - } - - existingTask = listenerCache.putIfAbsent( key, newNotificationTask ); - if (existingTask == null) { - - // We were able to put our new task - now submit it to the executor and - // we're done. If it throws a RejectedxecutionException, let that propagate - // to the caller. - - LOG.debug( "{}: Submitting NotificationTask for listener {}", - name, listener.toString() ); - - executor.execute( newNotificationTask ); - break; - } - } else { + NotificationTask newNotificationTask = null; + while (true) { + final NotificationTask existingTask = listenerCache.get(key); + if (existingTask != null && existingTask.submitNotifications(notifications)) { // We were able to add our notifications to an existing task so we're done. + break; + } + // Either there's no existing task or we couldn't add our notifications to the + // existing one because it's in the process of exiting and removing itself from + // the cache. Either way try to put a new task in the cache. If we can't put + // then either the existing one is still there and hasn't removed itself quite + // yet or some other concurrent thread beat us to the put although this method + // shouldn't be called concurrently for the same listener as that would violate + // notification ordering. In any case loop back up and try again. + + if (newNotificationTask == null) { + newNotificationTask = new NotificationTask(key, notifications); + } + final NotificationTask oldTask = listenerCache.putIfAbsent(key, newNotificationTask); + if (oldTask == null) { + // We were able to put our new task - now submit it to the executor and + // we're done. If it throws a RejectedxecutionException, let that propagate + // to the caller. + + LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener); + executor.execute(newNotificationTask); break; } + + LOG.debug("{}: retrying task queueing for {}", name, listener); } } catch (InterruptedException e) { - // We were interrupted trying to offer to the listener's queue. Somebody's probably // telling us to quit. - - LOG.debug( "{}: Interrupted trying to add to {} listener's queue", - name, listener.toString() ); + LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener); } - if (LOG.isTraceEnabled()) { - LOG.trace( "{}: submitNotifications dine for listener {}", - name, listener.toString() ); - } + LOG.trace("{}: submitNotifications dine for listener {}", name, listener); } /** @@ -192,13 +170,8 @@ public class QueuedNotificationManager implements NotificationManager * notification task in progress. */ public List getListenerNotificationQueueStats() { - List statsList = new ArrayList<>( listenerCache.size() ); - for (NotificationTask task: listenerCache.values()) { - statsList.add( new ListenerNotificationQueueStats( - task.listenerKey.toString(), task.notificationQueue.size() ) ); - } - - return statsList ; + return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(), + t.notificationQueue.size())).collect(Collectors.toList()); } /** @@ -222,12 +195,11 @@ public class QueuedNotificationManager implements NotificationManager * equals implementation that just blindly casts the other Object to compare instead of checking * for instanceof. */ - private static class ListenerKey { - + private static final class ListenerKey { private final L listener; - ListenerKey( L listener ) { - this.listener = listener; + ListenerKey(final L listener) { + this.listener = Preconditions.checkNotNull(listener); } L getListener() { @@ -236,20 +208,15 @@ public class QueuedNotificationManager implements NotificationManager @Override public int hashCode() { - return System.identityHashCode( listener ); + return System.identityHashCode(listener); } @Override - public boolean equals( Object obj ) { + public boolean equals(final Object obj) { if (obj == this) { return true; } - if (!(obj instanceof ListenerKey)) { - return false; - } - - ListenerKey other = (ListenerKey) obj; - return listener == other.listener; + return (obj instanceof ListenerKey) && listener == ((ListenerKey) obj).listener; } @Override @@ -263,29 +230,48 @@ public class QueuedNotificationManager implements NotificationManager * listener. */ private class NotificationTask implements Runnable { - + private final Lock queuingLock = new ReentrantLock(); private final BlockingQueue notificationQueue; - - private volatile boolean done = false; + private final ListenerKey listenerKey; @GuardedBy("queuingLock") private boolean queuedNotifications = false; + private volatile boolean done = false; - private final Lock queuingLock = new ReentrantLock(); + NotificationTask(final ListenerKey listenerKey, final Iterable notifications) { + this.listenerKey = Preconditions.checkNotNull(listenerKey); + this.notificationQueue = new LinkedBlockingQueue<>(maxQueueCapacity); - private final ListenerKey listenerKey; + for (N notification: notifications) { + this.notificationQueue.add(notification); + } + } - NotificationTask( ListenerKey listenerKey, Iterable notifications ) { + @GuardedBy("queuingLock") + private void publishNotification(final N notification) throws InterruptedException { + // The offer is attempted for up to 10 minutes, with a status message printed each minute + for (int notificationOfferAttempts = 0; + notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) { - this.listenerKey = listenerKey; - this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity ); + // Try to offer for up to a minute and log a message if it times out. + LOG.debug("{}: Offering notification to the queue for listener {}: {}", name, listenerKey, + notification); - for (N notification: notifications) { - this.notificationQueue.add( notification ); + if (notificationQueue.offer(notification, 1, TimeUnit.MINUTES)) { + return; + } + + LOG.warn("{}: Timed out trying to offer a notification to the queue for listener {} " + + "on attempt {} of {}. The queue has reached its capacity of {}", name, listenerKey, + notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity); } + + LOG.warn("{}: Failed to offer a notification to the queue for listener {}. Exceeded max allowable attempts" + + " of {} in {} minutes; the listener is likely in an unrecoverable state (deadlock or endless" + + " loop).", name, listenerKey, MAX_NOTIFICATION_OFFER_ATTEMPTS, MAX_NOTIFICATION_OFFER_ATTEMPTS); } - boolean submitNotifications( Iterable notifications ) throws InterruptedException { + boolean submitNotifications(final Iterable notifications) throws InterruptedException { queuingLock.lock(); try { @@ -297,37 +283,8 @@ public class QueuedNotificationManager implements NotificationManager return false; } - for (N notification: notifications) { - boolean notificationOfferAttemptSuccess = false; - // The offer is attempted for up to 10 minutes, with a status message printed each minute - for (int notificationOfferAttempts = 0; - notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) { - - // Try to offer for up to a minute and log a message if it times out. - if (LOG.isDebugEnabled()) { - LOG.debug( "{}: Offering notification to the queue for listener {}: {}", - name, listenerKey.toString(), notification ); - } - - if (notificationOfferAttemptSuccess = notificationQueue.offer( - notification, 1, TimeUnit.MINUTES)) { - break; - } - - LOG.warn( - "{}: Timed out trying to offer a notification to the queue for listener {} " - + "on attempt {} of {}. " + "The queue has reached its capacity of {}", - name, listenerKey.toString(), notificationOfferAttempts, - MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity); - } - if (!notificationOfferAttemptSuccess) { - LOG.warn( - "{}: Failed to offer a notification to the queue for listener {}. " - + "Exceeded max allowable attempts of {} in {} minutes; the listener " - + "is likely in an unrecoverable state (deadlock or endless loop).", - name, listenerKey.toString(), MAX_NOTIFICATION_OFFER_ATTEMPTS, - MAX_NOTIFICATION_OFFER_ATTEMPTS); - } + for (N notification : notifications) { + publishNotification(notification); } // Set the queuedNotifications flag to tell #run that we've just queued @@ -335,7 +292,6 @@ public class QueuedNotificationManager implements NotificationManager // point. queuedNotifications = true; - } finally { queuingLock.unlock(); } @@ -345,16 +301,14 @@ public class QueuedNotificationManager implements NotificationManager @Override public void run() { - try { // Loop until we've dispatched all the notifications in the queue. while (true) { - // Get the notification at the head of the queue, waiting a little bit for one // to get offered. - N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS ); + final N notification = notificationQueue.poll(10, TimeUnit.MILLISECONDS); if (notification == null) { // The queue is empty - try to get the queuingLock. If we can't get the lock @@ -386,52 +340,29 @@ public class QueuedNotificationManager implements NotificationManager } } - notifyListener( notification ); + notifyListener(notification); } } catch (InterruptedException e) { - // The executor is probably shutting down so log as debug. - LOG.debug( "{}: Interrupted trying to remove from {} listener's queue", - name, listenerKey.toString() ); + LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey); } finally { - // We're exiting, gracefully or not - either way make sure we always remove // ourselves from the cache. - - listenerCache.remove( listenerKey ); + listenerCache.remove(listenerKey, this); } } - private void notifyListener( N notification ) { - + private void notifyListener(final N notification) { if (notification == null) { return; } + LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notification); try { - - if (LOG.isDebugEnabled()) { - LOG.debug( "{}: Invoking listener {} with notification: {}", - name, listenerKey.toString(), notification ); - } - - listenerInvoker.invokeListener( listenerKey.getListener(), notification ); - - } catch (RuntimeException e ) { - - // We'll let a RuntimeException from the listener slide and keep sending any - // remaining notifications. - - LOG.error( String.format( "%1$s: Error notifying listener %2$s", name, - listenerKey.toString() ), e ); - - } catch (Error e) { - - // A JVM Error is severe - best practice is to throw them up the chain. Set done to - // true so no new notifications can be added to this task as we're about to bail. - - done = true; - throw e; + listenerInvoker.invokeListener(listenerKey.getListener(), notification); + } catch (Exception e) { + // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications. + LOG.error(String.format("%1$s: Error notifying listener %2$s", name, listenerKey), e); } } } -- 2.36.6