Cleanup QueuedNotificationManager 38/45938/2
authorRobert Varga <rovarga@cisco.com>
Tue, 20 Sep 2016 21:59:32 +0000 (23:59 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 21 Sep 2016 15:27:12 +0000 (15:27 +0000)
Audit code for coding style and unnecessary checks.

Change-Id: Ifce7d4fa4febb7f63ce9014e350e69143508e5c7
Signed-off-by: Robert Varga <rovarga@cisco.com>
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java

index a184ec679024b95a49a08c088013d5b53f927892..531adfcb812fb373d35876a0502750c18453e545 100644 (file)
@@ -29,7 +29,7 @@ public interface NotificationManager<L, N> {
      * @param notification the notification to dispatch
      * @throws RejectedExecutionException if the notification can't be queued for dispatching
      */
-    void submitNotification( L listener, N notification );
+    void submitNotification(L listener, N notification);
 
     /**
      * Submits notifications to be queued and dispatched to the given listener.
@@ -40,6 +40,6 @@ public interface NotificationManager<L, N> {
      * @param notifications the notifications to dispatch
      * @throws RejectedExecutionException if a notification can't be queued for dispatching
      */
-    void submitNotifications( final L listener, Iterable<N> notifications);
+    void submitNotifications(L listener, Iterable<N> notifications);
 
 }
index 90783f2cdc32d06be07e47c907a7504bd543db67..c0fb5602fa89416534a641f71173d88ee2887261 100644 (file)
@@ -9,7 +9,6 @@
 package org.opendaylight.yangtools.util.concurrent;
 
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -21,6 +20,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 import javax.annotation.concurrent.GuardedBy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ import org.slf4j.LoggerFactory;
  * @param <L> the listener type
  * @param <N> the notification type
  */
-public class QueuedNotificationManager<L,N> implements NotificationManager<L,N> {
+public class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
 
     /**
      * Interface implemented by clients that does the work of invoking listeners with notifications.
@@ -52,7 +52,7 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
      * @param <L> the listener type
      * @param <N> the notification type
      */
-    public interface Invoker<L,N> {
+    public interface Invoker<L, N> {
 
         /**
          * Called to invoke a listener with a notification.
@@ -60,10 +60,10 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
          * @param listener the listener to invoke
          * @param notification the notification to send
          */
-        void invokeListener( L listener, N notification );
+        void invokeListener(L listener, N notification);
     }
 
-    private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
+    private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
 
     /**
      * Caps the maximum number of attempts to offer notification to a particular listener.  Each
@@ -71,12 +71,9 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
      */
     private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
 
+    private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
+    private final Invoker<L, N> listenerInvoker;
     private final Executor executor;
-    private final Invoker<L,N> listenerInvoker;
-
-    private final ConcurrentMap<ListenerKey<L>,NotificationTask>
-                                                          listenerCache = new ConcurrentHashMap<>();
-
     private final String name;
     private final int maxQueueCapacity;
 
@@ -88,103 +85,84 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
      * @param maxQueueCapacity the capacity of each listener queue
      * @param name the name of this instance for logging info
      */
-    public QueuedNotificationManager( Executor executor, Invoker<L,N> listenerInvoker,
-            int maxQueueCapacity, String name ) {
-        this.executor = Preconditions.checkNotNull( executor );
-        this.listenerInvoker = Preconditions.checkNotNull( listenerInvoker );
-        Preconditions.checkArgument( maxQueueCapacity > 0, "maxQueueCapacity must be > 0 " );
+    public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
+            final int maxQueueCapacity, final String name) {
+        Preconditions.checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
+        this.executor = Preconditions.checkNotNull(executor);
+        this.listenerInvoker = Preconditions.checkNotNull(listenerInvoker);
         this.maxQueueCapacity = maxQueueCapacity;
-        this.name = Preconditions.checkNotNull( name );
+        this.name = Preconditions.checkNotNull(name);
     }
 
     /* (non-Javadoc)
      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
      */
     @Override
-    public void submitNotification( final L listener, final N notification )
-            throws RejectedExecutionException {
-
-        if (notification == null) {
-            return;
+    public void submitNotification(final L listener, final N notification) throws RejectedExecutionException {
+        if (notification != null) {
+            submitNotifications(listener, Collections.singletonList(notification));
         }
-
-        submitNotifications( listener, Collections.singletonList(notification));
     }
 
     /* (non-Javadoc)
      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
      */
     @Override
-    public void submitNotifications( final L listener, final Iterable<N> notifications )
+    public void submitNotifications(final L listener, final Iterable<N> notifications)
             throws RejectedExecutionException {
 
         if (notifications == null || listener == null) {
             return;
         }
 
-        if (LOG.isTraceEnabled()) {
-            LOG.trace( "{}: submitNotifications for listener {}: {}",
-                       name, listener.toString(), notifications );
-        }
+        LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
 
-        ListenerKey<L> key = new ListenerKey<>( listener );
-        NotificationTask newNotificationTask = null;
+        final ListenerKey<L> key = new ListenerKey<>(listener);
 
         // Keep looping until we are either able to add a new NotificationTask or are able to
         // add our notifications to an existing NotificationTask. Eventually one or the other
         // will occur.
-
         try {
-            while (true) {
-                NotificationTask existingTask = listenerCache.get( key );
-
-                if (existingTask == null || !existingTask.submitNotifications( notifications )) {
-
-                    // Either there's no existing task or we couldn't add our notifications to the
-                    // existing one because it's in the process of exiting and removing itself from
-                    // the cache. Either way try to put a new task in the cache. If we can't put
-                    // then either the existing one is still there and hasn't removed itself quite
-                    // yet or some other concurrent thread beat us to the put although this method
-                    // shouldn't be called concurrently for the same listener as that would violate
-                    // notification ordering. In any case loop back up and try again.
-
-                    if (newNotificationTask == null) {
-                        newNotificationTask = new NotificationTask( key, notifications );
-                    }
-
-                    existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
-                    if (existingTask == null) {
-
-                        // We were able to put our new task - now submit it to the executor and
-                        // we're done. If it throws a RejectedxecutionException, let that propagate
-                        // to the caller.
-
-                        LOG.debug( "{}: Submitting NotificationTask for listener {}",
-                                   name, listener.toString() );
-
-                        executor.execute( newNotificationTask );
-                        break;
-                    }
-                } else {
+            NotificationTask newNotificationTask = null;
 
+            while (true) {
+                final NotificationTask existingTask = listenerCache.get(key);
+                if (existingTask != null && existingTask.submitNotifications(notifications)) {
                     // We were able to add our notifications to an existing task so we're done.
+                    break;
+                }
 
+                // Either there's no existing task or we couldn't add our notifications to the
+                // existing one because it's in the process of exiting and removing itself from
+                // the cache. Either way try to put a new task in the cache. If we can't put
+                // then either the existing one is still there and hasn't removed itself quite
+                // yet or some other concurrent thread beat us to the put although this method
+                // shouldn't be called concurrently for the same listener as that would violate
+                // notification ordering. In any case loop back up and try again.
+
+                if (newNotificationTask == null) {
+                    newNotificationTask = new NotificationTask(key, notifications);
+                }
+                final NotificationTask oldTask = listenerCache.putIfAbsent(key, newNotificationTask);
+                if (oldTask == null) {
+                    // We were able to put our new task - now submit it to the executor and
+                    // we're done. If it throws a RejectedxecutionException, let that propagate
+                    // to the caller.
+
+                    LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
+                    executor.execute(newNotificationTask);
                     break;
                 }
+
+                LOG.debug("{}: retrying task queueing for {}", name, listener);
             }
         } catch (InterruptedException e) {
-
             // We were interrupted trying to offer to the listener's queue. Somebody's probably
             // telling us to quit.
-
-            LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
-                       name, listener.toString() );
+            LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
         }
 
-        if (LOG.isTraceEnabled()) {
-            LOG.trace( "{}: submitNotifications dine for listener {}",
-                       name, listener.toString() );
-        }
+        LOG.trace("{}: submitNotifications dine for listener {}", name, listener);
     }
 
     /**
@@ -192,13 +170,8 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
      * notification task in progress.
      */
     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
-        List<ListenerNotificationQueueStats> statsList = new ArrayList<>( listenerCache.size() );
-        for (NotificationTask task: listenerCache.values()) {
-            statsList.add( new ListenerNotificationQueueStats(
-                    task.listenerKey.toString(), task.notificationQueue.size() ) );
-        }
-
-        return statsList ;
+        return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
+            t.notificationQueue.size())).collect(Collectors.toList());
     }
 
     /**
@@ -222,12 +195,11 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
      * equals implementation that just blindly casts the other Object to compare instead of checking
      * for instanceof.
      */
-    private static class ListenerKey<L> {
-
+    private static final class ListenerKey<L> {
         private final L listener;
 
-        ListenerKey( L listener ) {
-            this.listener = listener;
+        ListenerKey(final L listener) {
+            this.listener = Preconditions.checkNotNull(listener);
         }
 
         L getListener() {
@@ -236,20 +208,15 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
 
         @Override
         public int hashCode() {
-            return System.identityHashCode( listener );
+            return System.identityHashCode(listener);
         }
 
         @Override
-        public boolean equals( Object obj ) {
+        public boolean equals(final Object obj) {
             if (obj == this) {
                 return true;
             }
-            if (!(obj instanceof ListenerKey<?>)) {
-                return false;
-            }
-
-            ListenerKey<?> other = (ListenerKey<?>) obj;
-            return listener == other.listener;
+            return (obj instanceof ListenerKey<?>) && listener == ((ListenerKey<?>) obj).listener;
         }
 
         @Override
@@ -263,29 +230,48 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
      * listener.
      */
     private class NotificationTask implements Runnable {
-
+        private final Lock queuingLock = new ReentrantLock();
         private final BlockingQueue<N> notificationQueue;
-
-        private volatile boolean done = false;
+        private final ListenerKey<L> listenerKey;
 
         @GuardedBy("queuingLock")
         private boolean queuedNotifications = false;
+        private volatile boolean done = false;
 
-        private final Lock queuingLock = new ReentrantLock();
+        NotificationTask(final ListenerKey<L> listenerKey, final Iterable<N> notifications) {
+            this.listenerKey = Preconditions.checkNotNull(listenerKey);
+            this.notificationQueue = new LinkedBlockingQueue<>(maxQueueCapacity);
 
-        private final ListenerKey<L> listenerKey;
+            for (N notification: notifications) {
+                this.notificationQueue.add(notification);
+            }
+        }
 
-        NotificationTask( ListenerKey<L> listenerKey, Iterable<N> notifications ) {
+        @GuardedBy("queuingLock")
+        private void publishNotification(final N notification) throws InterruptedException {
+            // The offer is attempted for up to 10 minutes, with a status message printed each minute
+            for (int notificationOfferAttempts = 0;
+                 notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
 
-            this.listenerKey = listenerKey;
-            this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
+                // Try to offer for up to a minute and log a message if it times out.
+                LOG.debug("{}: Offering notification to the queue for listener {}: {}", name, listenerKey,
+                    notification);
 
-            for (N notification: notifications) {
-                this.notificationQueue.add( notification );
+                if (notificationQueue.offer(notification, 1, TimeUnit.MINUTES)) {
+                    return;
+                }
+
+                LOG.warn("{}: Timed out trying to offer a notification to the queue for listener {} "
+                        + "on attempt {} of {}. The queue has reached its capacity of {}", name, listenerKey,
+                        notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity);
             }
+
+            LOG.warn("{}: Failed to offer a notification to the queue for listener {}. Exceeded max allowable attempts"
+                    + " of {} in {} minutes; the listener is likely in an unrecoverable state (deadlock or endless"
+                    + " loop).", name, listenerKey, MAX_NOTIFICATION_OFFER_ATTEMPTS, MAX_NOTIFICATION_OFFER_ATTEMPTS);
         }
 
-        boolean submitNotifications( Iterable<N> notifications ) throws InterruptedException {
+        boolean submitNotifications(final Iterable<N> notifications) throws InterruptedException {
 
             queuingLock.lock();
             try {
@@ -297,37 +283,8 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
                     return false;
                 }
 
-                for (N notification: notifications) {
-                    boolean notificationOfferAttemptSuccess = false;
-                    // The offer is attempted for up to 10 minutes, with a status message printed each minute
-                    for (int notificationOfferAttempts = 0;
-                         notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
-
-                        // Try to offer for up to a minute and log a message if it times out.
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
-                                       name, listenerKey.toString(), notification );
-                        }
-
-                        if (notificationOfferAttemptSuccess = notificationQueue.offer(
-                                notification, 1, TimeUnit.MINUTES)) {
-                            break;
-                        }
-
-                        LOG.warn(
-                                "{}: Timed out trying to offer a notification to the queue for listener {} "
-                                        + "on attempt {} of {}. " + "The queue has reached its capacity of {}",
-                                name, listenerKey.toString(), notificationOfferAttempts,
-                                MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity);
-                    }
-                    if (!notificationOfferAttemptSuccess) {
-                        LOG.warn(
-                                "{}: Failed to offer a notification to the queue for listener {}. "
-                                        + "Exceeded max allowable attempts of {} in {} minutes; the listener "
-                                        + "is likely in an unrecoverable state (deadlock or endless loop).",
-                                name, listenerKey.toString(), MAX_NOTIFICATION_OFFER_ATTEMPTS,
-                                MAX_NOTIFICATION_OFFER_ATTEMPTS);
-                    }
+                for (N notification : notifications) {
+                    publishNotification(notification);
                 }
 
                 // Set the queuedNotifications flag to tell #run that we've just queued
@@ -335,7 +292,6 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
                 // point.
 
                 queuedNotifications = true;
-
             } finally {
                 queuingLock.unlock();
             }
@@ -345,16 +301,14 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
 
         @Override
         public void run() {
-
             try {
                 // Loop until we've dispatched all the notifications in the queue.
 
                 while (true) {
-
                     // Get the notification at the head of the queue, waiting a little bit for one
                     // to get offered.
 
-                    N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
+                    final N notification = notificationQueue.poll(10, TimeUnit.MILLISECONDS);
                     if (notification == null) {
 
                         // The queue is empty - try to get the queuingLock. If we can't get the lock
@@ -386,52 +340,29 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
                         }
                     }
 
-                    notifyListener( notification );
+                    notifyListener(notification);
                 }
             } catch (InterruptedException e) {
-
                 // The executor is probably shutting down so log as debug.
-                LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
-                           name, listenerKey.toString() );
+                LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
             } finally {
-
                 // We're exiting, gracefully or not - either way make sure we always remove
                 // ourselves from the cache.
-
-                listenerCache.remove( listenerKey );
+                listenerCache.remove(listenerKey, this);
             }
         }
 
-        private void notifyListener( N notification ) {
-
+        private void notifyListener(final N notification) {
             if (notification == null) {
                 return;
             }
 
+            LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notification);
             try {
-
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug( "{}: Invoking listener {} with notification: {}",
-                               name, listenerKey.toString(), notification );
-                }
-
-                listenerInvoker.invokeListener( listenerKey.getListener(), notification );
-
-            } catch (RuntimeException e ) {
-
-                // We'll let a RuntimeException from the listener slide and keep sending any
-                // remaining notifications.
-
-                LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
-                           listenerKey.toString() ), e );
-
-            } catch (Error e) {
-
-                // A JVM Error is severe - best practice is to throw them up the chain. Set done to
-                // true so no new notifications can be added to this task as we're about to bail.
-
-                done = true;
-                throw e;
+                listenerInvoker.invokeListener(listenerKey.getListener(), notification);
+            } catch (Exception e) {
+                // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
+                LOG.error(String.format("%1$s: Error notifying listener %2$s", name, listenerKey), e);
             }
         }
     }