import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
* Caps the maximum number of attempts to offer notification to a particular listener. Each
* attempt window is 1 minute, so an offer times out after roughly 10 minutes.
*/
- private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
+ private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
+ private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
+ private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
private final BatchedInvoker<L, N> listenerInvoker;
@Deprecated
public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
final int maxQueueCapacity, final String name) {
- this(executor, (BatchedInvoker<L, N>)(l, c) -> c.forEach(n -> listenerInvoker.invokeListener(l, n)),
- maxQueueCapacity, name);
+ this(executor, (BatchedInvoker<L, N>)(l, c) -> c.forEach(n -> {
+ try {
+ listenerInvoker.invokeListener(l, n);
+ } catch (Exception e) {
+ LOG.error("{}: Error notifying listener {} with {}", name, l, n, e);
+ }
+
+ }), maxQueueCapacity, name);
Preconditions.checkNotNull(listenerInvoker);
}
return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
}
+ /**
+ * Returns the maximum listener queue capacity.
+ */
+ public int getMaxQueueCapacity() {
+ return maxQueueCapacity;
+ }
+
+ /**
+ * Returns the {@link Executor} to used for notification tasks.
+ */
+ public Executor getExecutor() {
+ return executor;
+ }
+
/* (non-Javadoc)
* @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
*/
// add our notifications to an existing NotificationTask. Eventually one or the other
// will occur.
try {
- NotificationTask newNotificationTask = null;
+ Iterator<N> it = notifications.iterator();
while (true) {
- final NotificationTask existingTask = listenerCache.get(key);
- if (existingTask != null && existingTask.submitNotifications(notifications)) {
- // We were able to add our notifications to an existing task so we're done.
- break;
+ NotificationTask task = listenerCache.get(key);
+ if (task == null) {
+ // No task found, try to insert a new one
+ final NotificationTask newTask = new NotificationTask(key, it);
+ task = listenerCache.putIfAbsent(key, newTask);
+ if (task == null) {
+ // We were able to put our new task - now submit it to the executor and
+ // we're done. If it throws a RejectedExecutionException, let that propagate
+ // to the caller.
+ runTask(listener, newTask);
+ break;
+ }
+
+ // We have a racing task, hence we can continue, but we need to refresh our iterator from
+ // the task.
+ it = newTask.recoverItems();
}
- // Either there's no existing task or we couldn't add our notifications to the
- // existing one because it's in the process of exiting and removing itself from
- // the cache. Either way try to put a new task in the cache. If we can't put
- // then either the existing one is still there and hasn't removed itself quite
- // yet or some other concurrent thread beat us to the put although this method
- // shouldn't be called concurrently for the same listener as that would violate
- // notification ordering. In any case loop back up and try again.
+ final boolean completed = task.submitNotifications(it);
+ if (!completed) {
+ // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
+ // than spinning on removal, we try to replace it.
+ final NotificationTask newTask = new NotificationTask(key, it);
+ if (listenerCache.replace(key, task, newTask)) {
+ runTask(listener, newTask);
+ break;
+ }
- if (newNotificationTask == null) {
- newNotificationTask = new NotificationTask(key, notifications);
- }
- final NotificationTask oldTask = listenerCache.putIfAbsent(key, newNotificationTask);
- if (oldTask == null) {
- // We were able to put our new task - now submit it to the executor and
- // we're done. If it throws a RejectedxecutionException, let that propagate
- // to the caller.
-
- LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
- executor.execute(newNotificationTask);
- break;
+ // We failed to replace the task, hence we need retry. Note we have to recover the items to be
+ // published from the new task.
+ it = newTask.recoverItems();
+ LOG.debug("{}: retrying task queueing for {}", name, listener);
+ continue;
}
- LOG.debug("{}: retrying task queueing for {}", name, listener);
+ // All notifications have either been delivered or we have timed out and warned about the ones we
+ // have failed to deliver. In any case we are done here.
+ break;
}
} catch (InterruptedException e) {
// We were interrupted trying to offer to the listener's queue. Somebody's probably
LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
}
- LOG.trace("{}: submitNotifications dine for listener {}", name, listener);
+ LOG.trace("{}: submitNotifications done for listener {}", name, listener);
}
/**
*/
public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
- t.notificationQueue.size())).collect(Collectors.toList());
+ t.size())).collect(Collectors.toList());
}
- /**
- * Returns the maximum listener queue capacity.
- */
- public int getMaxQueueCapacity() {
- return maxQueueCapacity;
- }
-
- /**
- * Returns the {@link Executor} to used for notification tasks.
- */
- public Executor getExecutor() {
- return executor;
+ private void runTask(final L listener, final NotificationTask task) {
+ LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
+ executor.execute(task);
}
/**
* listener.
*/
private class NotificationTask implements Runnable {
- private final Lock queuingLock = new ReentrantLock();
- private final BlockingQueue<N> notificationQueue;
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition notEmpty = lock.newCondition();
+ private final Condition notFull = lock.newCondition();
private final ListenerKey<L> listenerKey;
- @GuardedBy("queuingLock")
- private boolean queuedNotifications = false;
- private volatile boolean done = false;
+ @GuardedBy("lock")
+ private final Queue<N> queue = new ArrayDeque<>();
+ @GuardedBy("lock")
+ private boolean exiting;
- NotificationTask(final ListenerKey<L> listenerKey, final Iterable<N> notifications) {
+ NotificationTask(final ListenerKey<L> listenerKey, final Iterator<N> notifications) {
this.listenerKey = Preconditions.checkNotNull(listenerKey);
- this.notificationQueue = new LinkedBlockingQueue<>(maxQueueCapacity);
+ while (notifications.hasNext()) {
+ queue.offer(notifications.next());
+ }
+ }
- for (N notification: notifications) {
- this.notificationQueue.add(notification);
+ Iterator<N> recoverItems() {
+ // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
+ // get started, hence this is safe.
+ return queue.iterator();
+ }
+
+ int size() {
+ lock.lock();
+ try {
+ return queue.size();
+ } finally {
+ lock.unlock();
}
}
- @GuardedBy("queuingLock")
- private void publishNotification(final N notification) throws InterruptedException {
- // The offer is attempted for up to 10 minutes, with a status message printed each minute
- for (int notificationOfferAttempts = 0;
- notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
+ boolean submitNotifications(final Iterator<N> notifications) throws InterruptedException {
+ final long start = System.nanoTime();
+ final long deadline = start + GIVE_UP_NANOS;
- // Try to offer for up to a minute and log a message if it times out.
- LOG.debug("{}: Offering notification to the queue for listener {}: {}", name, listenerKey,
- notification);
+ lock.lock();
+ try {
+ // Lock may have blocked for some time, we need to take that into account. We may have exceedded
+ // the deadline, but that is unlikely and even in that case we can make some progress without further
+ // blocking.
+ long canWait = deadline - System.nanoTime();
- if (notificationQueue.offer(notification, 1, TimeUnit.MINUTES)) {
- return;
- }
+ while (true) {
+ // Check the exiting flag - if true then #run is in the process of exiting so return
+ // false to indicate such. Otherwise, offer the notifications to the queue.
+ if (exiting) {
+ return false;
+ }
- LOG.warn("{}: Timed out trying to offer a notification to the queue for listener {} "
- + "on attempt {} of {}. The queue has reached its capacity of {}", name, listenerKey,
- notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity);
- }
+ final int avail = maxQueueCapacity - queue.size();
+ if (avail <= 0) {
+ if (canWait <= 0) {
+ LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
+ + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
+ + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
+ listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
+ return true;
+ }
- LOG.warn("{}: Failed to offer a notification to the queue for listener {}. Exceeded max allowable attempts"
- + " of {} in {} minutes; the listener is likely in an unrecoverable state (deadlock or endless"
- + " loop).", name, listenerKey, MAX_NOTIFICATION_OFFER_ATTEMPTS, MAX_NOTIFICATION_OFFER_ATTEMPTS);
- }
+ canWait = notFull.awaitNanos(canWait);
+ continue;
+ }
- boolean submitNotifications(final Iterable<N> notifications) throws InterruptedException {
+ for (int i = 0; i < avail; ++i) {
+ if (!notifications.hasNext()) {
+ notEmpty.signal();
+ return true;
+ }
- queuingLock.lock();
- try {
+ queue.offer(notifications.next());
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
- // Check the done flag - if true then #run is in the process of exiting so return
- // false to indicate such. Otherwise, offer the notifications to the queue.
+ @GuardedBy("lock")
+ private boolean waitForQueue() {
+ long timeout = TASK_WAIT_NANOS;
- if (done) {
+ while (queue.isEmpty()) {
+ if (timeout <= 0) {
return false;
}
- for (N notification : notifications) {
- publishNotification(notification);
+ try {
+ timeout = notEmpty.awaitNanos(timeout);
+ } catch (InterruptedException e) {
+ // The executor is probably shutting down so log as debug.
+ LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
+ return false;
}
-
- // Set the queuedNotifications flag to tell #run that we've just queued
- // notifications and not to exit yet, even if it thinks the queue is empty at this
- // point.
-
- queuedNotifications = true;
- } finally {
- queuingLock.unlock();
}
return true;
public void run() {
try {
// Loop until we've dispatched all the notifications in the queue.
-
while (true) {
- // Get the notification at the head of the queue, waiting a little bit for one
- // to get offered.
-
- final N notification = notificationQueue.poll(10, TimeUnit.MILLISECONDS);
- if (notification == null) {
+ final Collection<N> notifications;
- // The queue is empty - try to get the queuingLock. If we can't get the lock
- // then #submitNotifications is in the process of offering to the queue so
- // we'll loop back up and poll the queue again.
-
- if (queuingLock.tryLock()) {
- try {
-
- // Check the queuedNotifications flag to see if #submitNotifications
- // has offered new notification(s) to the queue. If so, loop back up
- // and poll the queue again. Otherwise set done to true and exit.
- // Once we set the done flag and unlock, calls to
- // #submitNotifications will fail and a new task will be created.
-
- if (!queuedNotifications) {
- done = true;
- break;
- }
-
- // Clear the queuedNotifications flag so we'll try to exit the next
- // time through the loop when the queue is empty.
+ lock.lock();
+ try {
+ if (!waitForQueue()) {
+ exiting = true;
+ break;
+ }
- queuedNotifications = false;
+ // Splice the entire queue
+ notifications = ImmutableList.copyOf(queue);
+ queue.clear();
- } finally {
- queuingLock.unlock();
- }
- }
+ notFull.signalAll();
+ } finally {
+ lock.unlock();
}
- notifyListener(notification);
+ invokeListener(notifications);
}
- } catch (InterruptedException e) {
- // The executor is probably shutting down so log as debug.
- LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
} finally {
// We're exiting, gracefully or not - either way make sure we always remove
// ourselves from the cache.
}
}
- private void notifyListener(final N notification) {
- if (notification == null) {
- return;
- }
-
- LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notification);
+ private void invokeListener(final Collection<N> notifications) {
+ LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
try {
- listenerInvoker.invokeListener(listenerKey.getListener(), ImmutableList.of(notification));
+ listenerInvoker.invokeListener(listenerKey.getListener(), notifications);
} catch (Exception e) {
// We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
- LOG.error(String.format("%1$s: Error notifying listener %2$s", name, listenerKey), e);
+ LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
}
}
}