--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Stream;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yangtools.concepts.AbstractIdentifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages queuing and dispatching tasks for multiple workers concurrently. Tasks are queued on a per-worker
+ * basis and dispatched serially to each worker via an {@link Executor}.
+ *
+ * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
+ * task for a worker when there are pending tasks. On the first task(s), a queue is created and a dispatcher task is
+ * submitted to the executor to dispatch the queue to the associated worker. Any subsequent tasks that occur before all
+ * previous tasks have been dispatched are appended to the existing queue. When all tasks have been dispatched,
+ * the queue and dispatcher task are discarded.
+ *
+ * @author Thomas Pantelis
+ * @author Robert Varga
+ *
+ * @param <K> worker key type
+ * @param <T> task type
+ */
+abstract class AbstractBatchingExecutor<K, T> extends AbstractIdentifiable<String> {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractBatchingExecutor.class);
+
+ /**
+ * Caps the maximum number of attempts to offer a task to a particular worker. Each attempt window is 1 minute, so
+ * an offer times out after roughly 10 minutes.
+ */
+ private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
+ private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
+ private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
+
+ private final ConcurrentMap<K, DispatcherTask> dispatcherTasks = new ConcurrentHashMap<>();
+ private final @NonNull Executor executor;
+ private final int maxQueueCapacity;
+
+ AbstractBatchingExecutor(final @NonNull String name, final @NonNull Executor executor, final int maxQueueCapacity) {
+ super(name);
+ this.executor = requireNonNull(executor);
+ checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
+ this.maxQueueCapacity = maxQueueCapacity;
+ }
+
+ /**
+ * Returns the maximum worker queue capacity.
+ */
+ final int maxQueueCapacity() {
+ return maxQueueCapacity;
+ }
+
+ /**
+ * Returns the {@link Executor} to used for dispatcher tasks.
+ */
+ final @NonNull Executor executor() {
+ return executor;
+ }
+
+ // FIXME: YANGTOOLS-1016: allow explicit blocking control
+ final void submitTask(final K key, final T task) {
+ submitTasks(key, Collections.singletonList(requireNonNull(task)));
+ }
+
+ // FIXME: YANGTOOLS-1016: allow explicit blocking control with return of un-enqueued tasks (or removal from input)
+ final void submitTasks(final K key, final Iterable<T> tasks) {
+ if (tasks == null || key == null) {
+ return;
+ }
+
+ LOG.trace("{}: submitTasks for worker {}: {}", getIdentifier(), key, tasks);
+
+ // Keep looping until we are either able to add a new DispatcherTask or are able to add our tasks to an existing
+ // DispatcherTask. Eventually one or the other will occur.
+ try {
+ Iterator<T> it = tasks.iterator();
+
+ while (true) {
+ DispatcherTask task = dispatcherTasks.get(key);
+ if (task == null) {
+ // No task found, try to insert a new one
+ final DispatcherTask newTask = new DispatcherTask(key, it);
+ task = dispatcherTasks.putIfAbsent(key, newTask);
+ if (task == null) {
+ // We were able to put our new task - now submit it to the executor and we're done. If it throws
+ // a RejectedExecutionException, let that propagate to the caller.
+ runTask(key, newTask);
+ break;
+ }
+
+ // We have a racing task, hence we can continue, but we need to refresh our iterator from the task.
+ it = newTask.recoverItems();
+ }
+
+ final boolean completed = task.submitTasks(it);
+ if (!completed) {
+ // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
+ // than spinning on removal, we try to replace it.
+ final DispatcherTask newTask = new DispatcherTask(key, it);
+ if (dispatcherTasks.replace(key, task, newTask)) {
+ runTask(key, newTask);
+ break;
+ }
+
+ // We failed to replace the task, hence we need retry. Note we have to recover the items to be
+ // published from the new task.
+ it = newTask.recoverItems();
+ LOG.debug("{}: retrying task queueing for {}", getIdentifier(), key);
+ continue;
+ }
+
+ // All tasks have either been delivered or we have timed out and warned about the ones we have failed
+ // to deliver. In any case we are done here.
+ break;
+ }
+ } catch (InterruptedException e) {
+ // We were interrupted trying to offer to the worker's queue. Somebody's probably telling us to quit.
+ LOG.warn("{}: Interrupted trying to add to {} worker's queue", getIdentifier(), key);
+ }
+
+ LOG.trace("{}: submitTasks done for worker {}", getIdentifier(), key);
+ }
+
+ final Stream<DispatcherTask> streamTasks() {
+ return dispatcherTasks.values().stream();
+ }
+
+ abstract void executeBatch(K key, @NonNull ImmutableList<T> tasks) throws Exception;
+
+ private void runTask(final K key, final DispatcherTask task) {
+ LOG.debug("{}: Submitting DispatcherTask for worker {}", getIdentifier(), key);
+ executor.execute(task);
+ }
+
+ /**
+ * Executor task for a single worker that queues tasks and sends them serially to the worker.
+ */
+ final class DispatcherTask implements Runnable {
+ private final Lock lock = new ReentrantLock();
+ private final Condition notEmpty = lock.newCondition();
+ private final Condition notFull = lock.newCondition();
+ private final @NonNull K key;
+
+ @GuardedBy("lock")
+ private final Queue<T> queue = new ArrayDeque<>();
+ @GuardedBy("lock")
+ private boolean exiting;
+
+ DispatcherTask(final @NonNull K key, final @NonNull Iterator<T> tasks) {
+ this.key = requireNonNull(key);
+ while (tasks.hasNext()) {
+ final T task = tasks.next();
+ if (task != null) {
+ queue.add(task);
+ }
+ }
+ }
+
+ @NonNull Iterator<T> recoverItems() {
+ // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
+ // get started, hence this is safe.
+ return queue.iterator();
+ }
+
+ @NonNull K key() {
+ return key;
+ }
+
+ int size() {
+ lock.lock();
+ try {
+ return queue.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ boolean submitTasks(final @NonNull Iterator<T> tasks) throws InterruptedException {
+ final long start = System.nanoTime();
+ final long deadline = start + GIVE_UP_NANOS;
+
+ lock.lock();
+ try {
+ // Lock may have blocked for some time, we need to take that into account. We may have exceeded
+ // the deadline, but that is unlikely and even in that case we can make some progress without further
+ // blocking.
+ long canWait = deadline - System.nanoTime();
+
+ while (true) {
+ // Check the exiting flag - if true then #run is in the process of exiting so return false
+ // to indicate such. Otherwise, offer the tasks to the queue.
+ if (exiting) {
+ return false;
+ }
+
+ final int avail = maxQueueCapacity - queue.size();
+ if (avail <= 0) {
+ if (canWait <= 0) {
+ LOG.warn("{}: Failed to offer tasks {} to the queue for worker {}. Exceeded "
+ + "maximum allowable time of {} minutes; the worker is likely in an unrecoverable "
+ + "state (deadlock or endless loop). ", getIdentifier(), ImmutableList.copyOf(tasks),
+ key, MAX_NOTIFICATION_OFFER_MINUTES);
+ return true;
+ }
+
+ canWait = notFull.awaitNanos(canWait);
+ continue;
+ }
+
+ for (int i = 0; i < avail; ++i) {
+ if (!tasks.hasNext()) {
+ notEmpty.signal();
+ return true;
+ }
+
+ queue.add(tasks.next());
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @GuardedBy("lock")
+ private boolean waitForQueue() {
+ long timeout = TASK_WAIT_NANOS;
+
+ while (queue.isEmpty()) {
+ if (timeout <= 0) {
+ return false;
+ }
+
+ try {
+ timeout = notEmpty.awaitNanos(timeout);
+ } catch (InterruptedException e) {
+ // The executor is probably shutting down so log as debug.
+ LOG.debug("{}: Interrupted trying to remove from {} worker's queue", getIdentifier(), key);
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Loop until we've dispatched all the tasks in the queue.
+ while (true) {
+ final @NonNull ImmutableList<T> tasks;
+
+ lock.lock();
+ try {
+ if (!waitForQueue()) {
+ exiting = true;
+ break;
+ }
+
+ // Splice the entire queue
+ tasks = ImmutableList.copyOf(queue);
+ queue.clear();
+
+ notFull.signalAll();
+ } finally {
+ lock.unlock();
+ }
+
+ invokeWorker(tasks);
+ }
+ } finally {
+ // We're exiting, gracefully or not - either way make sure we always remove
+ // ourselves from the cache.
+ dispatcherTasks.remove(key, this);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:illegalCatch")
+ private void invokeWorker(final @NonNull ImmutableList<T> tasks) {
+ LOG.debug("{}: Invoking worker {} with tasks: {}", getIdentifier(), key, tasks);
+ try {
+ executeBatch(key, tasks);
+ } catch (Exception e) {
+ // We'll let a RuntimeException from the worker slide and keep sending any remaining tasks.
+ LOG.error("{}: Error invoking worker {} with {}", getIdentifier(), key, tasks, e);
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
+
+/**
+ * This class manages queuing and dispatching notifications for multiple listeners concurrently.
+ * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
+ * {@link Executor}.
+ *
+ * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
+ * task for a listener when there are pending notifications. On the first notification(s), a queue
+ * is created and a task is submitted to the executor to dispatch the queue to the associated
+ * listener. Any subsequent notifications that occur before all previous notifications have been
+ * dispatched are appended to the existing queue. When all notifications have been dispatched, the
+ * queue and task are discarded.
+ *
+ * @author Thomas Pantelis
+ *
+ * @param <L> the listener type
+ * @param <N> the notification type
+ */
+@NonNullByDefault
+abstract class AbstractQueuedNotificationManager<T, L, N> extends AbstractBatchingExecutor<T, N>
+ implements NotificationManager<L, N> {
+
+ private final QueuedNotificationManagerMXBean mxBean = new QueuedNotificationManagerMXBeanImpl(this);
+ private final BatchedInvoker<L, N> listenerInvoker;
+
+ AbstractQueuedNotificationManager(final String name, final Executor executor, final int maxQueueCapacity,
+ final BatchedInvoker<L, N> listenerInvoker) {
+ super(name, executor, maxQueueCapacity);
+ this.listenerInvoker = requireNonNull(listenerInvoker);
+ }
+
+ /**
+ * Returns the {@link Executor} to used for notification tasks.
+ */
+ public final Executor getExecutor() {
+ return executor();
+ }
+
+ /**
+ * Returns the maximum listener queue capacity.
+ */
+ public final int getMaxQueueCapacity() {
+ return maxQueueCapacity();
+ }
+
+ /**
+ * Return an {@link QueuedNotificationManagerMXBean} tied to this instance.
+ *
+ * @return An QueuedNotificationManagerMXBean object.
+ */
+ public final QueuedNotificationManagerMXBean getMXBean() {
+ return mxBean;
+ }
+
+ /**
+ * Returns {@link ListenerNotificationQueueStats} instances for each current listener
+ * notification task in progress.
+ */
+ // FIXME: drop visibility to package-protected
+ public final List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
+ return streamTasks().map(t -> new ListenerNotificationQueueStats(t.key().toString(), t.size()))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public final void submitNotification(final L listener, final N notification) {
+ if (listener != null && notification != null) {
+ submitTask(wrap(listener), notification);
+ }
+ }
+
+ @Override
+ public final void submitNotifications(final L listener, final @Nullable Iterable<N> notifications) {
+ if (listener != null && notifications != null) {
+ submitTasks(wrap(listener), notifications);
+ }
+ }
+
+ @Override
+ final void executeBatch(final T key, final @NonNull ImmutableList<N> tasks) {
+ listenerInvoker.invokeListener(unwrap(key), tasks);
+ }
+
+ abstract T wrap(L listener);
+
+ abstract L unwrap(T key);
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import com.google.common.annotations.Beta;
+import java.util.concurrent.Executor;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
+
+@Beta
+@NonNullByDefault
+public final class EqualityQueuedNotificationManager<L, N> extends AbstractQueuedNotificationManager<L, L, N> {
+ public EqualityQueuedNotificationManager(final String name, final Executor executor, final int maxQueueCapacity,
+ final BatchedInvoker<L, N> listenerInvoker) {
+ super(name, executor, maxQueueCapacity, listenerInvoker);
+ }
+
+ @Override
+ L wrap(final L listener) {
+ return listener;
+ }
+
+ @Override
+ L unwrap(final L key) {
+ return key;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import com.google.common.annotations.Beta;
+import java.util.concurrent.Executor;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.opendaylight.yangtools.util.ForwardingIdentityObject;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
+
+@Beta
+@NonNullByDefault
+public class IdentityQueuedNotificationManager<L, N>
+ extends AbstractQueuedNotificationManager<ForwardingIdentityObject<L>, L, N> {
+ public IdentityQueuedNotificationManager(final String name, final Executor executor, final int maxQueueCapacity,
+ final BatchedInvoker<L, N> listenerInvoker) {
+ super(name, executor, maxQueueCapacity, listenerInvoker);
+ }
+
+ @Override
+ final ForwardingIdentityObject<L> wrap(final L listener) {
+ return ForwardingIdentityObject.of(listener);
+ }
+
+ @Override
+ final L unwrap(final ForwardingIdentityObject<L> key) {
+ return key.getDelegate();
+ }
+}
* @throws RejectedExecutionException if a notification can't be queued for dispatching
*/
void submitNotifications(L listener, Iterable<N> notifications);
-
}
*/
package org.opendaylight.yangtools.util.concurrent;
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
import com.google.common.collect.ImmutableList;
-import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.yangtools.util.ForwardingIdentityObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * This class manages queuing and dispatching notifications for multiple listeners concurrently.
- * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
- * {@link Executor}.
+ * {@inheritDoc}
*
- * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
- * task for a listener when there are pending notifications. On the first notification(s), a queue
- * is created and a task is submitted to the executor to dispatch the queue to the associated
- * listener. Any subsequent notifications that occur before all previous notifications have been
- * dispatched are appended to the existing queue. When all notifications have been dispatched, the
- * queue and task are discarded.
+ * <p>
+ * This class is pessimistic about listener type and uses identity mapping for comparing them. This is defensive versus
+ * reused objects, maintaining semantics. This may not always be intended, for example if {@code L} is a {@code String}
+ * which is being dynamically determined. In that case we do not want to use identity, but equality, as otherwise
+ * the caller is forced to use {@link String#intern()} -- leading to interning in lookup, which is absolutely
+ * unnecessary. In such use cases, use {@link EqualityQueuedNotificationManager} instead.
*
* @author Thomas Pantelis
- *
- * @param <L> the listener type
- * @param <N> the notification type
*/
-public final class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
+public final class QueuedNotificationManager<L, N> extends IdentityQueuedNotificationManager<L, N> {
@FunctionalInterface
public interface BatchedInvoker<L, N> {
/**
void invokeListener(@NonNull L listener, @NonNull ImmutableList<N> notifications);
}
- private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
-
- /**
- * Caps the maximum number of attempts to offer notification to a particular listener. Each
- * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
- */
- private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
- private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
- private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
-
- /**
- * We key by listener reference identity hashCode/equals.
- * Since we don't know anything about the listener class implementations and we're mixing
- * multiple listener class instances in the same map, this avoids any potential issue with an
- * equals implementation that just blindly casts the other Object to compare instead of checking
- * for instanceof.
- */
- private final ConcurrentMap<ForwardingIdentityObject<L>, NotificationTask> listenerCache =
- new ConcurrentHashMap<>();
- private final @NonNull QueuedNotificationManagerMXBean mxBean = new QueuedNotificationManagerMXBeanImpl(this);
- private final @NonNull BatchedInvoker<L, N> listenerInvoker;
- private final @NonNull Executor executor;
- private final @NonNull String name;
- private final int maxQueueCapacity;
-
- private QueuedNotificationManager(final @NonNull Executor executor,
- final @NonNull BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
- final @NonNull String name) {
- checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
- this.executor = requireNonNull(executor);
- this.listenerInvoker = requireNonNull(listenerInvoker);
- this.maxQueueCapacity = maxQueueCapacity;
- this.name = requireNonNull(name);
+ QueuedNotificationManager(final @NonNull Executor executor, final @NonNull BatchedInvoker<L, N> listenerInvoker,
+ final int maxQueueCapacity, final @NonNull String name) {
+ super(name, executor, maxQueueCapacity, listenerInvoker);
}
/**
final @NonNull String name) {
return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
}
-
- /**
- * Returns the maximum listener queue capacity.
- */
- public int getMaxQueueCapacity() {
- return maxQueueCapacity;
- }
-
- /**
- * Return an {@link QueuedNotificationManagerMXBean} tied to this instance.
- *
- * @return An QueuedNotificationManagerMXBean object.
- */
- public @NonNull QueuedNotificationManagerMXBean getMXBean() {
- return mxBean;
- }
-
- /**
- * Returns the {@link Executor} to used for notification tasks.
- */
- public @NonNull Executor getExecutor() {
- return executor;
- }
-
- /* (non-Javadoc)
- * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
- */
- @Override
- public void submitNotification(final L listener, final N notification) {
- if (notification != null) {
- submitNotifications(listener, Collections.singletonList(notification));
- }
- }
-
- /* (non-Javadoc)
- * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
- */
- @Override
- public void submitNotifications(final L listener, final Iterable<N> notifications) {
-
- if (notifications == null || listener == null) {
- return;
- }
-
- LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
-
- final ForwardingIdentityObject<L> key = ForwardingIdentityObject.of(listener);
-
- // Keep looping until we are either able to add a new NotificationTask or are able to
- // add our notifications to an existing NotificationTask. Eventually one or the other
- // will occur.
- try {
- Iterator<N> it = notifications.iterator();
-
- while (true) {
- NotificationTask task = listenerCache.get(key);
- if (task == null) {
- // No task found, try to insert a new one
- final NotificationTask newTask = new NotificationTask(key, it);
- task = listenerCache.putIfAbsent(key, newTask);
- if (task == null) {
- // We were able to put our new task - now submit it to the executor and
- // we're done. If it throws a RejectedExecutionException, let that propagate
- // to the caller.
- runTask(listener, newTask);
- break;
- }
-
- // We have a racing task, hence we can continue, but we need to refresh our iterator from
- // the task.
- it = newTask.recoverItems();
- }
-
- final boolean completed = task.submitNotifications(it);
- if (!completed) {
- // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
- // than spinning on removal, we try to replace it.
- final NotificationTask newTask = new NotificationTask(key, it);
- if (listenerCache.replace(key, task, newTask)) {
- runTask(listener, newTask);
- break;
- }
-
- // We failed to replace the task, hence we need retry. Note we have to recover the items to be
- // published from the new task.
- it = newTask.recoverItems();
- LOG.debug("{}: retrying task queueing for {}", name, listener);
- continue;
- }
-
- // All notifications have either been delivered or we have timed out and warned about the ones we
- // have failed to deliver. In any case we are done here.
- break;
- }
- } catch (InterruptedException e) {
- // We were interrupted trying to offer to the listener's queue. Somebody's probably
- // telling us to quit.
- LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
- }
-
- LOG.trace("{}: submitNotifications done for listener {}", name, listener);
- }
-
- /**
- * Returns {@link ListenerNotificationQueueStats} instances for each current listener
- * notification task in progress.
- */
- public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
- return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
- t.size())).collect(Collectors.toList());
- }
-
- private void runTask(final L listener, final NotificationTask task) {
- LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
- executor.execute(task);
- }
-
- /**
- * Executor task for a single listener that queues notifications and sends them serially to the
- * listener.
- */
- private class NotificationTask implements Runnable {
- private final Lock lock = new ReentrantLock();
- private final Condition notEmpty = lock.newCondition();
- private final Condition notFull = lock.newCondition();
- private final @NonNull ForwardingIdentityObject<L> listenerKey;
-
- @GuardedBy("lock")
- private final Queue<N> queue = new ArrayDeque<>();
- @GuardedBy("lock")
- private boolean exiting;
-
- NotificationTask(final @NonNull ForwardingIdentityObject<L> listenerKey,
- final @NonNull Iterator<N> notifications) {
- this.listenerKey = requireNonNull(listenerKey);
- while (notifications.hasNext()) {
- queue.add(notifications.next());
- }
- }
-
- @NonNull Iterator<N> recoverItems() {
- // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
- // get started, hence this is safe.
- return queue.iterator();
- }
-
- int size() {
- lock.lock();
- try {
- return queue.size();
- } finally {
- lock.unlock();
- }
- }
-
- boolean submitNotifications(final @NonNull Iterator<N> notifications) throws InterruptedException {
- final long start = System.nanoTime();
- final long deadline = start + GIVE_UP_NANOS;
-
- lock.lock();
- try {
- // Lock may have blocked for some time, we need to take that into account. We may have exceedded
- // the deadline, but that is unlikely and even in that case we can make some progress without further
- // blocking.
- long canWait = deadline - System.nanoTime();
-
- while (true) {
- // Check the exiting flag - if true then #run is in the process of exiting so return
- // false to indicate such. Otherwise, offer the notifications to the queue.
- if (exiting) {
- return false;
- }
-
- final int avail = maxQueueCapacity - queue.size();
- if (avail <= 0) {
- if (canWait <= 0) {
- LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
- + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
- + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
- listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
- return true;
- }
-
- canWait = notFull.awaitNanos(canWait);
- continue;
- }
-
- for (int i = 0; i < avail; ++i) {
- if (!notifications.hasNext()) {
- notEmpty.signal();
- return true;
- }
-
- queue.add(notifications.next());
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- @GuardedBy("lock")
- private boolean waitForQueue() {
- long timeout = TASK_WAIT_NANOS;
-
- while (queue.isEmpty()) {
- if (timeout <= 0) {
- return false;
- }
-
- try {
- timeout = notEmpty.awaitNanos(timeout);
- } catch (InterruptedException e) {
- // The executor is probably shutting down so log as debug.
- LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
- return false;
- }
- }
-
- return true;
- }
-
- @Override
- public void run() {
- try {
- // Loop until we've dispatched all the notifications in the queue.
- while (true) {
- final @NonNull ImmutableList<N> notifications;
-
- lock.lock();
- try {
- if (!waitForQueue()) {
- exiting = true;
- break;
- }
-
- // Splice the entire queue
- notifications = ImmutableList.copyOf(queue);
- queue.clear();
-
- notFull.signalAll();
- } finally {
- lock.unlock();
- }
-
- invokeListener(notifications);
- }
- } finally {
- // We're exiting, gracefully or not - either way make sure we always remove
- // ourselves from the cache.
- listenerCache.remove(listenerKey, this);
- }
- }
-
- @SuppressWarnings("checkstyle:illegalCatch")
- private void invokeListener(final @NonNull ImmutableList<N> notifications) {
- LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
- try {
- listenerInvoker.invokeListener(listenerKey.getDelegate(), notifications);
- } catch (Exception e) {
- // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
- LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
- }
- }
- }
}
import java.util.List;
final class QueuedNotificationManagerMXBeanImpl implements QueuedNotificationManagerMXBean {
- private final QueuedNotificationManager<?, ?> manager;
+ private final AbstractQueuedNotificationManager<?, ?, ?> manager;
- QueuedNotificationManagerMXBeanImpl(final QueuedNotificationManager<?, ?> manager) {
+ QueuedNotificationManagerMXBeanImpl(final AbstractQueuedNotificationManager<?, ?, ?> manager) {
this.manager = requireNonNull(manager);
}