private static final Logger LOG = LoggerFactory.getLogger( QueuedNotificationManager.class );
+ /**
+ * Caps the maximum number of attempts to offer notification to a particular listener. Each
+ * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
+ */
+ private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
+
private final Executor executor;
private final Invoker<L,N> listenerInvoker;
}
for( N notification: notifications ) {
-
- while( true ) {
+ boolean notificationOfferAttemptSuccess = false;
+ // The offer is attempted for up to 10 minutes, with a status message printed each minute
+ for (int notificationOfferAttempts = 0;
+ notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++ ) {
// Try to offer for up to a minute and log a message if it times out.
-
- // FIXME: we loop forever to guarantee delivery however this leaves it open
- // for 1 rogue listener to bring everyone to a halt. Another option is to
- // limit the tries and give up after a while and drop the notification.
- // Given a reasonably large queue capacity and long timeout, if we still
- // can't queue then most likely the listener is an unrecoverable state
- // (deadlock or endless loop).
-
if( LOG.isDebugEnabled() ) {
LOG.debug( "{}: Offering notification to the queue for listener {}: {}",
name, listenerKey.toString(), notification );
}
- if( notificationQueue.offer( notification, 1, TimeUnit.MINUTES ) ) {
+ if( notificationOfferAttemptSuccess = notificationQueue.offer(
+ notification, 1, TimeUnit.MINUTES ) ) {
break;
}
LOG.warn(
- "{}: Timed out trying to offer a notification to the queue for listener {}." +
+ "{}: Timed out trying to offer a notification to the queue for listener {} " +
+ "on attempt {} of {}. " +
"The queue has reached its capacity of {}",
- name, listenerKey.toString(), maxQueueCapacity );
+ name, listenerKey.toString(), notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS,
+ maxQueueCapacity );
+ }
+ if (!notificationOfferAttemptSuccess) {
+ LOG.warn(
+ "{}: Failed to offer a notification to the queue for listener {}. " +
+ "Exceeded max allowable attempts of {} in {} minutes; the listener " +
+ "is likely in an unrecoverable state (deadlock or endless loop).",
+ name, listenerKey.toString(), MAX_NOTIFICATION_OFFER_ATTEMPTS,
+ MAX_NOTIFICATION_OFFER_ATTEMPTS );
}
}