Organize Imports to be Checkstyle compliant in utils
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManager.java
index 27c81a1ee1969436c03a1405a81544c32109e6e0..09d867b15c58551672cf0f7ff2534aa0215624e8 100644 (file)
@@ -8,8 +8,9 @@
 
 package org.opendaylight.yangtools.util.concurrent;
 
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -20,14 +21,10 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-
 import javax.annotation.concurrent.GuardedBy;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 /**
  * This class manages queuing and dispatching notifications for multiple listeners concurrently.
  * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
@@ -68,6 +65,12 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
 
     private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
 
+    /**
+     * Caps the maximum number of attempts to offer notification to a particular listener.  Each
+     * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
+     */
+    private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
+
     private final Executor executor;
     private final Invoker<L,N> listenerInvoker;
 
@@ -101,11 +104,11 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
     public void submitNotification( final L listener, final N notification )
             throws RejectedExecutionException {
 
-        if( notification == null ) {
+        if (notification == null) {
             return;
         }
 
-        submitNotifications( listener, Arrays.asList( notification ) );
+        submitNotifications( listener, Collections.singletonList(notification));
     }
 
     /* (non-Javadoc)
@@ -115,13 +118,13 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
     public void submitNotifications( final L listener, final Iterable<N> notifications )
             throws RejectedExecutionException {
 
-        if( notifications == null || listener == null ) {
+        if (notifications == null || listener == null) {
             return;
         }
 
-        if( LOG.isTraceEnabled() ) {
+        if (LOG.isTraceEnabled()) {
             LOG.trace( "{}: submitNotifications for listener {}: {}",
-                       name, listener.getClass(), notifications );
+                       name, listener.toString(), notifications );
         }
 
         ListenerKey<L> key = new ListenerKey<>( listener );
@@ -132,10 +135,10 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
         // will occur.
 
         try {
-            while( true ) {
+            while (true) {
                 NotificationTask existingTask = listenerCache.get( key );
 
-                if( existingTask == null || !existingTask.submitNotifications( notifications ) ) {
+                if (existingTask == null || !existingTask.submitNotifications( notifications )) {
 
                     // Either there's no existing task or we couldn't add our notifications to the
                     // existing one because it's in the process of exiting and removing itself from
@@ -145,19 +148,19 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
                     // shouldn't be called concurrently for the same listener as that would violate
                     // notification ordering. In any case loop back up and try again.
 
-                    if( newNotificationTask == null ) {
+                    if (newNotificationTask == null) {
                         newNotificationTask = new NotificationTask( key, notifications );
                     }
 
                     existingTask = listenerCache.putIfAbsent( key, newNotificationTask );
-                    if( existingTask == null ) {
+                    if (existingTask == null) {
 
                         // We were able to put our new task - now submit it to the executor and
                         // we're done. If it throws a RejectedxecutionException, let that propagate
                         // to the caller.
 
                         LOG.debug( "{}: Submitting NotificationTask for listener {}",
-                                   name, listener.getClass() );
+                                   name, listener.toString() );
 
                         executor.execute( newNotificationTask );
                         break;
@@ -175,12 +178,12 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
             // telling us to quit.
 
             LOG.debug( "{}: Interrupted trying to add to {} listener's queue",
-                       name, listener.getClass() );
+                       name, listener.toString() );
         }
 
-        if( LOG.isTraceEnabled() ) {
+        if (LOG.isTraceEnabled()) {
             LOG.trace( "{}: submitNotifications dine for listener {}",
-                       name, listener.getClass() );
+                       name, listener.toString() );
         }
     }
 
@@ -190,10 +193,9 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
      */
     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
         List<ListenerNotificationQueueStats> statsList = new ArrayList<>( listenerCache.size() );
-        for( NotificationTask task: listenerCache.values() ) {
+        for (NotificationTask task: listenerCache.values()) {
             statsList.add( new ListenerNotificationQueueStats(
-                    task.listenerKey.getListener().getClass().getName(),
-                    task.notificationQueue.size() ) );
+                    task.listenerKey.toString(), task.notificationQueue.size() ) );
         }
 
         return statsList ;
@@ -249,6 +251,11 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
             ListenerKey<?> other = (ListenerKey<?>) obj;
             return listener == other.listener;
         }
+
+        @Override
+        public String toString() {
+            return listener.toString();
+        }
     }
 
     /**
@@ -273,7 +280,7 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
             this.listenerKey = listenerKey;
             this.notificationQueue = new LinkedBlockingQueue<>( maxQueueCapacity );
 
-            for( N notification: notifications ) {
+            for (N notification: notifications) {
                 this.notificationQueue.add( notification );
             }
         }
@@ -286,36 +293,41 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
                 // Check the done flag - if true then #run is in the process of exiting so return
                 // false to indicate such. Otherwise, offer the notifications to the queue.
 
-                if( done ) {
+                if (done) {
                     return false;
                 }
 
-                for( N notification: notifications ) {
-
-                    while( true ) {
+                for (N notification: notifications) {
+                    boolean notificationOfferAttemptSuccess = false;
+                    // The offer is attempted for up to 10 minutes, with a status message printed each minute
+                    for (int notificationOfferAttempts = 0;
+                         notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
 
                         // Try to offer for up to a minute and log a message if it times out.
-
-                        // FIXME: we loop forever to guarantee delivery however this leaves it open
-                        // for 1 rogue listener to bring everyone to a halt. Another option is to
-                        // limit the tries and give up after a while and drop the notification.
-                        // Given a reasonably large queue capacity and long timeout, if we still
-                        // can't queue then most likely the listener is an unrecoverable state
-                        // (deadlock or endless loop).
-
-                        if( LOG.isDebugEnabled() ) {
+                        if (LOG.isDebugEnabled()) {
                             LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
-                                       name, listenerKey.getListener().getClass(), notification );
+                                       name, listenerKey.toString(), notification );
                         }
 
-                        if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) {
+                        if (notificationOfferAttemptSuccess = notificationQueue.offer(
+                                notification, 1, TimeUnit.MINUTES)) {
                             break;
                         }
 
                         LOG.warn(
-                            "{}: Timed out trying to offer a notification to the queue for listener {}." +
+                            "{}: Timed out trying to offer a notification to the queue for listener {} " +
+                            "on attempt {} of {}. " +
                             "The queue has reached its capacity of {}",
-                            name, listenerKey.getListener().getClass(), maxQueueCapacity );
+                            name, listenerKey.toString(), notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS,
+                            maxQueueCapacity );
+                    }
+                    if (!notificationOfferAttemptSuccess) {
+                        LOG.warn(
+                            "{}: Failed to offer a notification to the queue for listener {}. " +
+                            "Exceeded max allowable attempts of {} in {} minutes; the listener " +
+                            "is likely in an unrecoverable state (deadlock or endless loop).",
+                            name, listenerKey.toString(), MAX_NOTIFICATION_OFFER_ATTEMPTS,
+                            MAX_NOTIFICATION_OFFER_ATTEMPTS );
                     }
                 }
 
@@ -338,19 +350,19 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
             try {
                 // Loop until we've dispatched all the notifications in the queue.
 
-                while( true ) {
+                while (true) {
 
                     // Get the notification at the head of the queue, waiting a little bit for one
                     // to get offered.
 
                     N notification = notificationQueue.poll( 10, TimeUnit.MILLISECONDS );
-                    if( notification == null ) {
+                    if (notification == null) {
 
                         // The queue is empty - try to get the queuingLock. If we can't get the lock
                         // then #submitNotifications is in the process of offering to the queue so
                         // we'll loop back up and poll the queue again.
 
-                        if( queuingLock.tryLock() ) {
+                        if (queuingLock.tryLock()) {
                             try {
 
                                 // Check the queuedNotifications flag to see if #submitNotifications
@@ -359,7 +371,7 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
                                 // Once we set the done flag and unlock, calls to
                                 // #submitNotifications will fail and a new task will be created.
 
-                                if( !queuedNotifications ) {
+                                if (!queuedNotifications) {
                                     done = true;
                                     break;
                                 }
@@ -381,7 +393,7 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
 
                 // The executor is probably shutting down so log as debug.
                 LOG.debug( "{}: Interrupted trying to remove from {} listener's queue",
-                           name, listenerKey.getListener().getClass() );
+                           name, listenerKey.toString() );
             } finally {
 
                 // We're exiting, gracefully or not - either way make sure we always remove
@@ -393,15 +405,15 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
 
         private void notifyListener( N notification ) {
 
-            if( notification == null ) {
+            if (notification == null) {
                 return;
             }
 
             try {
 
-                if( LOG.isDebugEnabled() ) {
+                if (LOG.isDebugEnabled()) {
                     LOG.debug( "{}: Invoking listener {} with notification: {}",
-                               name, listenerKey.getListener().getClass(), notification );
+                               name, listenerKey.toString(), notification );
                 }
 
                 listenerInvoker.invokeListener( listenerKey.getListener(), notification );
@@ -412,7 +424,7 @@ public class QueuedNotificationManager<L,N> implements NotificationManager<L,N>
                 // remaining notifications.
 
                 LOG.error( String.format( "%1$s: Error notifying listener %2$s", name,
-                           listenerKey.getListener().getClass() ), e );
+                           listenerKey.toString() ), e );
 
             } catch( Error e ) {