From 714dbbbc6d6573ec211fa28cb2794102edd1f397 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 16 Aug 2019 18:00:47 +0200 Subject: [PATCH] Separate out {Identity,Equality}QueuedNotificationManager There are use cases where we would like to use QNM on equality, i.e. we do not realy have a listener instance, but rather a key to somewhere else. That other place operates on equality, which makes it incompatible with the current semantics. This splits out the baseline queueing logic in from NotificationManager interface, so that it can be reused to implement different execution concepts. QueuedNotificationManager is realized on top of it, so that the baseline interface is implemented in AbstractQueuedNotificationManager and then specialized to different lookups. JIRA: YANGTOOLS-1038 Change-Id: I1123d21664a20ff380ccb7db7096801915491f6e Signed-off-by: Robert Varga (cherry picked from commit 807d0868fa0e9b81332ea12d8fe19f81f88efc08) --- .../concurrent/AbstractBatchingExecutor.java | 312 ++++++++++++++++ .../AbstractQueuedNotificationManager.java | 106 ++++++ .../EqualityQueuedNotificationManager.java | 32 ++ .../IdentityQueuedNotificationManager.java | 34 ++ .../util/concurrent/NotificationManager.java | 1 - .../concurrent/QueuedNotificationManager.java | 341 +----------------- .../QueuedNotificationManagerMXBeanImpl.java | 4 +- 7 files changed, 497 insertions(+), 333 deletions(-) create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractBatchingExecutor.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractQueuedNotificationManager.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/EqualityQueuedNotificationManager.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/IdentityQueuedNotificationManager.java diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractBatchingExecutor.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractBatchingExecutor.java new file mode 100644 index 0000000000..7b40ea38be --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractBatchingExecutor.java @@ -0,0 +1,312 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.yangtools.util.concurrent; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Stream; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.yangtools.concepts.AbstractIdentifiable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class manages queuing and dispatching tasks for multiple workers concurrently. Tasks are queued on a per-worker + * basis and dispatched serially to each worker via an {@link Executor}. + * + *

This class optimizes its memory footprint by only allocating and maintaining a queue and executor + * task for a worker when there are pending tasks. On the first task(s), a queue is created and a dispatcher task is + * submitted to the executor to dispatch the queue to the associated worker. Any subsequent tasks that occur before all + * previous tasks have been dispatched are appended to the existing queue. When all tasks have been dispatched, + * the queue and dispatcher task are discarded. + * + * @author Thomas Pantelis + * @author Robert Varga + * + * @param worker key type + * @param task type + */ +abstract class AbstractBatchingExecutor extends AbstractIdentifiable { + private static final Logger LOG = LoggerFactory.getLogger(AbstractBatchingExecutor.class); + + /** + * Caps the maximum number of attempts to offer a task to a particular worker. Each attempt window is 1 minute, so + * an offer times out after roughly 10 minutes. + */ + private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10; + private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES); + private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10); + + private final ConcurrentMap dispatcherTasks = new ConcurrentHashMap<>(); + private final @NonNull Executor executor; + private final int maxQueueCapacity; + + AbstractBatchingExecutor(final @NonNull String name, final @NonNull Executor executor, final int maxQueueCapacity) { + super(name); + this.executor = requireNonNull(executor); + checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity); + this.maxQueueCapacity = maxQueueCapacity; + } + + /** + * Returns the maximum worker queue capacity. + */ + final int maxQueueCapacity() { + return maxQueueCapacity; + } + + /** + * Returns the {@link Executor} to used for dispatcher tasks. + */ + final @NonNull Executor executor() { + return executor; + } + + // FIXME: YANGTOOLS-1016: allow explicit blocking control + final void submitTask(final K key, final T task) { + submitTasks(key, Collections.singletonList(requireNonNull(task))); + } + + // FIXME: YANGTOOLS-1016: allow explicit blocking control with return of un-enqueued tasks (or removal from input) + final void submitTasks(final K key, final Iterable tasks) { + if (tasks == null || key == null) { + return; + } + + LOG.trace("{}: submitTasks for worker {}: {}", getIdentifier(), key, tasks); + + // Keep looping until we are either able to add a new DispatcherTask or are able to add our tasks to an existing + // DispatcherTask. Eventually one or the other will occur. + try { + Iterator it = tasks.iterator(); + + while (true) { + DispatcherTask task = dispatcherTasks.get(key); + if (task == null) { + // No task found, try to insert a new one + final DispatcherTask newTask = new DispatcherTask(key, it); + task = dispatcherTasks.putIfAbsent(key, newTask); + if (task == null) { + // We were able to put our new task - now submit it to the executor and we're done. If it throws + // a RejectedExecutionException, let that propagate to the caller. + runTask(key, newTask); + break; + } + + // We have a racing task, hence we can continue, but we need to refresh our iterator from the task. + it = newTask.recoverItems(); + } + + final boolean completed = task.submitTasks(it); + if (!completed) { + // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather + // than spinning on removal, we try to replace it. + final DispatcherTask newTask = new DispatcherTask(key, it); + if (dispatcherTasks.replace(key, task, newTask)) { + runTask(key, newTask); + break; + } + + // We failed to replace the task, hence we need retry. Note we have to recover the items to be + // published from the new task. + it = newTask.recoverItems(); + LOG.debug("{}: retrying task queueing for {}", getIdentifier(), key); + continue; + } + + // All tasks have either been delivered or we have timed out and warned about the ones we have failed + // to deliver. In any case we are done here. + break; + } + } catch (InterruptedException e) { + // We were interrupted trying to offer to the worker's queue. Somebody's probably telling us to quit. + LOG.warn("{}: Interrupted trying to add to {} worker's queue", getIdentifier(), key); + } + + LOG.trace("{}: submitTasks done for worker {}", getIdentifier(), key); + } + + final Stream streamTasks() { + return dispatcherTasks.values().stream(); + } + + abstract void executeBatch(K key, @NonNull ImmutableList tasks) throws Exception; + + private void runTask(final K key, final DispatcherTask task) { + LOG.debug("{}: Submitting DispatcherTask for worker {}", getIdentifier(), key); + executor.execute(task); + } + + /** + * Executor task for a single worker that queues tasks and sends them serially to the worker. + */ + final class DispatcherTask implements Runnable { + private final Lock lock = new ReentrantLock(); + private final Condition notEmpty = lock.newCondition(); + private final Condition notFull = lock.newCondition(); + private final @NonNull K key; + + @GuardedBy("lock") + private final Queue queue = new ArrayDeque<>(); + @GuardedBy("lock") + private boolean exiting; + + DispatcherTask(final @NonNull K key, final @NonNull Iterator tasks) { + this.key = requireNonNull(key); + while (tasks.hasNext()) { + final T task = tasks.next(); + if (task != null) { + queue.add(task); + } + } + } + + @NonNull Iterator recoverItems() { + // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never + // get started, hence this is safe. + return queue.iterator(); + } + + @NonNull K key() { + return key; + } + + int size() { + lock.lock(); + try { + return queue.size(); + } finally { + lock.unlock(); + } + } + + boolean submitTasks(final @NonNull Iterator tasks) throws InterruptedException { + final long start = System.nanoTime(); + final long deadline = start + GIVE_UP_NANOS; + + lock.lock(); + try { + // Lock may have blocked for some time, we need to take that into account. We may have exceeded + // the deadline, but that is unlikely and even in that case we can make some progress without further + // blocking. + long canWait = deadline - System.nanoTime(); + + while (true) { + // Check the exiting flag - if true then #run is in the process of exiting so return false + // to indicate such. Otherwise, offer the tasks to the queue. + if (exiting) { + return false; + } + + final int avail = maxQueueCapacity - queue.size(); + if (avail <= 0) { + if (canWait <= 0) { + LOG.warn("{}: Failed to offer tasks {} to the queue for worker {}. Exceeded " + + "maximum allowable time of {} minutes; the worker is likely in an unrecoverable " + + "state (deadlock or endless loop). ", getIdentifier(), ImmutableList.copyOf(tasks), + key, MAX_NOTIFICATION_OFFER_MINUTES); + return true; + } + + canWait = notFull.awaitNanos(canWait); + continue; + } + + for (int i = 0; i < avail; ++i) { + if (!tasks.hasNext()) { + notEmpty.signal(); + return true; + } + + queue.add(tasks.next()); + } + } + } finally { + lock.unlock(); + } + } + + @GuardedBy("lock") + private boolean waitForQueue() { + long timeout = TASK_WAIT_NANOS; + + while (queue.isEmpty()) { + if (timeout <= 0) { + return false; + } + + try { + timeout = notEmpty.awaitNanos(timeout); + } catch (InterruptedException e) { + // The executor is probably shutting down so log as debug. + LOG.debug("{}: Interrupted trying to remove from {} worker's queue", getIdentifier(), key); + return false; + } + } + + return true; + } + + @Override + public void run() { + try { + // Loop until we've dispatched all the tasks in the queue. + while (true) { + final @NonNull ImmutableList tasks; + + lock.lock(); + try { + if (!waitForQueue()) { + exiting = true; + break; + } + + // Splice the entire queue + tasks = ImmutableList.copyOf(queue); + queue.clear(); + + notFull.signalAll(); + } finally { + lock.unlock(); + } + + invokeWorker(tasks); + } + } finally { + // We're exiting, gracefully or not - either way make sure we always remove + // ourselves from the cache. + dispatcherTasks.remove(key, this); + } + } + + @SuppressWarnings("checkstyle:illegalCatch") + private void invokeWorker(final @NonNull ImmutableList tasks) { + LOG.debug("{}: Invoking worker {} with tasks: {}", getIdentifier(), key, tasks); + try { + executeBatch(key, tasks); + } catch (Exception e) { + // We'll let a RuntimeException from the worker slide and keep sending any remaining tasks. + LOG.error("{}: Error invoking worker {} with {}", getIdentifier(), key, tasks, e); + } + } + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractQueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractQueuedNotificationManager.java new file mode 100644 index 0000000000..50f78373b0 --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractQueuedNotificationManager.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.yangtools.util.concurrent; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker; + +/** + * This class manages queuing and dispatching notifications for multiple listeners concurrently. + * Notifications are queued on a per-listener basis and dispatched serially to each listener via an + * {@link Executor}. + * + *

This class optimizes its memory footprint by only allocating and maintaining a queue and executor + * task for a listener when there are pending notifications. On the first notification(s), a queue + * is created and a task is submitted to the executor to dispatch the queue to the associated + * listener. Any subsequent notifications that occur before all previous notifications have been + * dispatched are appended to the existing queue. When all notifications have been dispatched, the + * queue and task are discarded. + * + * @author Thomas Pantelis + * + * @param the listener type + * @param the notification type + */ +@NonNullByDefault +abstract class AbstractQueuedNotificationManager extends AbstractBatchingExecutor + implements NotificationManager { + + private final QueuedNotificationManagerMXBean mxBean = new QueuedNotificationManagerMXBeanImpl(this); + private final BatchedInvoker listenerInvoker; + + AbstractQueuedNotificationManager(final String name, final Executor executor, final int maxQueueCapacity, + final BatchedInvoker listenerInvoker) { + super(name, executor, maxQueueCapacity); + this.listenerInvoker = requireNonNull(listenerInvoker); + } + + /** + * Returns the {@link Executor} to used for notification tasks. + */ + public final Executor getExecutor() { + return executor(); + } + + /** + * Returns the maximum listener queue capacity. + */ + public final int getMaxQueueCapacity() { + return maxQueueCapacity(); + } + + /** + * Return an {@link QueuedNotificationManagerMXBean} tied to this instance. + * + * @return An QueuedNotificationManagerMXBean object. + */ + public final QueuedNotificationManagerMXBean getMXBean() { + return mxBean; + } + + /** + * Returns {@link ListenerNotificationQueueStats} instances for each current listener + * notification task in progress. + */ + // FIXME: drop visibility to package-protected + public final List getListenerNotificationQueueStats() { + return streamTasks().map(t -> new ListenerNotificationQueueStats(t.key().toString(), t.size())) + .collect(Collectors.toList()); + } + + @Override + public final void submitNotification(final L listener, final N notification) { + if (listener != null && notification != null) { + submitTask(wrap(listener), notification); + } + } + + @Override + public final void submitNotifications(final L listener, final @Nullable Iterable notifications) { + if (listener != null && notifications != null) { + submitTasks(wrap(listener), notifications); + } + } + + @Override + final void executeBatch(final T key, final @NonNull ImmutableList tasks) { + listenerInvoker.invokeListener(unwrap(key), tasks); + } + + abstract T wrap(L listener); + + abstract L unwrap(T key); +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/EqualityQueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/EqualityQueuedNotificationManager.java new file mode 100644 index 0000000000..6531c5e137 --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/EqualityQueuedNotificationManager.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.yangtools.util.concurrent; + +import com.google.common.annotations.Beta; +import java.util.concurrent.Executor; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker; + +@Beta +@NonNullByDefault +public final class EqualityQueuedNotificationManager extends AbstractQueuedNotificationManager { + public EqualityQueuedNotificationManager(final String name, final Executor executor, final int maxQueueCapacity, + final BatchedInvoker listenerInvoker) { + super(name, executor, maxQueueCapacity, listenerInvoker); + } + + @Override + L wrap(final L listener) { + return listener; + } + + @Override + L unwrap(final L key) { + return key; + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/IdentityQueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/IdentityQueuedNotificationManager.java new file mode 100644 index 0000000000..c22868034f --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/IdentityQueuedNotificationManager.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.yangtools.util.concurrent; + +import com.google.common.annotations.Beta; +import java.util.concurrent.Executor; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.opendaylight.yangtools.util.ForwardingIdentityObject; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker; + +@Beta +@NonNullByDefault +public class IdentityQueuedNotificationManager + extends AbstractQueuedNotificationManager, L, N> { + public IdentityQueuedNotificationManager(final String name, final Executor executor, final int maxQueueCapacity, + final BatchedInvoker listenerInvoker) { + super(name, executor, maxQueueCapacity, listenerInvoker); + } + + @Override + final ForwardingIdentityObject wrap(final L listener) { + return ForwardingIdentityObject.of(listener); + } + + @Override + final L unwrap(final ForwardingIdentityObject key) { + return key.getDelegate(); + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java index 531adfcb81..3c21d65a6c 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java @@ -41,5 +41,4 @@ public interface NotificationManager { * @throws RejectedExecutionException if a notification can't be queued for dispatching */ void submitNotifications(L listener, Iterable notifications); - } diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java index dd0bcb7c1e..8839099af0 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java @@ -7,47 +7,23 @@ */ package org.opendaylight.yangtools.util.concurrent; -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; - import com.google.common.collect.ImmutableList; -import java.util.ArrayDeque; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; -import org.checkerframework.checker.lock.qual.GuardedBy; import org.eclipse.jdt.annotation.NonNull; -import org.opendaylight.yangtools.util.ForwardingIdentityObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * This class manages queuing and dispatching notifications for multiple listeners concurrently. - * Notifications are queued on a per-listener basis and dispatched serially to each listener via an - * {@link Executor}. + * {@inheritDoc} * - *

This class optimizes its memory footprint by only allocating and maintaining a queue and executor - * task for a listener when there are pending notifications. On the first notification(s), a queue - * is created and a task is submitted to the executor to dispatch the queue to the associated - * listener. Any subsequent notifications that occur before all previous notifications have been - * dispatched are appended to the existing queue. When all notifications have been dispatched, the - * queue and task are discarded. + *

+ * This class is pessimistic about listener type and uses identity mapping for comparing them. This is defensive versus + * reused objects, maintaining semantics. This may not always be intended, for example if {@code L} is a {@code String} + * which is being dynamically determined. In that case we do not want to use identity, but equality, as otherwise + * the caller is forced to use {@link String#intern()} -- leading to interning in lookup, which is absolutely + * unnecessary. In such use cases, use {@link EqualityQueuedNotificationManager} instead. * * @author Thomas Pantelis - * - * @param the listener type - * @param the notification type */ -public final class QueuedNotificationManager implements NotificationManager { +public final class QueuedNotificationManager extends IdentityQueuedNotificationManager { @FunctionalInterface public interface BatchedInvoker { /** @@ -59,39 +35,9 @@ public final class QueuedNotificationManager implements NotificationManage void invokeListener(@NonNull L listener, @NonNull ImmutableList notifications); } - private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class); - - /** - * Caps the maximum number of attempts to offer notification to a particular listener. Each - * attempt window is 1 minute, so an offer times out after roughly 10 minutes. - */ - private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10; - private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES); - private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10); - - /** - * We key by listener reference identity hashCode/equals. - * Since we don't know anything about the listener class implementations and we're mixing - * multiple listener class instances in the same map, this avoids any potential issue with an - * equals implementation that just blindly casts the other Object to compare instead of checking - * for instanceof. - */ - private final ConcurrentMap, NotificationTask> listenerCache = - new ConcurrentHashMap<>(); - private final @NonNull QueuedNotificationManagerMXBean mxBean = new QueuedNotificationManagerMXBeanImpl(this); - private final @NonNull BatchedInvoker listenerInvoker; - private final @NonNull Executor executor; - private final @NonNull String name; - private final int maxQueueCapacity; - - private QueuedNotificationManager(final @NonNull Executor executor, - final @NonNull BatchedInvoker listenerInvoker, final int maxQueueCapacity, - final @NonNull String name) { - checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity); - this.executor = requireNonNull(executor); - this.listenerInvoker = requireNonNull(listenerInvoker); - this.maxQueueCapacity = maxQueueCapacity; - this.name = requireNonNull(name); + QueuedNotificationManager(final @NonNull Executor executor, final @NonNull BatchedInvoker listenerInvoker, + final int maxQueueCapacity, final @NonNull String name) { + super(name, executor, maxQueueCapacity, listenerInvoker); } /** @@ -107,269 +53,4 @@ public final class QueuedNotificationManager implements NotificationManage final @NonNull String name) { return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name); } - - /** - * Returns the maximum listener queue capacity. - */ - public int getMaxQueueCapacity() { - return maxQueueCapacity; - } - - /** - * Return an {@link QueuedNotificationManagerMXBean} tied to this instance. - * - * @return An QueuedNotificationManagerMXBean object. - */ - public @NonNull QueuedNotificationManagerMXBean getMXBean() { - return mxBean; - } - - /** - * Returns the {@link Executor} to used for notification tasks. - */ - public @NonNull Executor getExecutor() { - return executor; - } - - /* (non-Javadoc) - * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N) - */ - @Override - public void submitNotification(final L listener, final N notification) { - if (notification != null) { - submitNotifications(listener, Collections.singletonList(notification)); - } - } - - /* (non-Javadoc) - * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection) - */ - @Override - public void submitNotifications(final L listener, final Iterable notifications) { - - if (notifications == null || listener == null) { - return; - } - - LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications); - - final ForwardingIdentityObject key = ForwardingIdentityObject.of(listener); - - // Keep looping until we are either able to add a new NotificationTask or are able to - // add our notifications to an existing NotificationTask. Eventually one or the other - // will occur. - try { - Iterator it = notifications.iterator(); - - while (true) { - NotificationTask task = listenerCache.get(key); - if (task == null) { - // No task found, try to insert a new one - final NotificationTask newTask = new NotificationTask(key, it); - task = listenerCache.putIfAbsent(key, newTask); - if (task == null) { - // We were able to put our new task - now submit it to the executor and - // we're done. If it throws a RejectedExecutionException, let that propagate - // to the caller. - runTask(listener, newTask); - break; - } - - // We have a racing task, hence we can continue, but we need to refresh our iterator from - // the task. - it = newTask.recoverItems(); - } - - final boolean completed = task.submitNotifications(it); - if (!completed) { - // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather - // than spinning on removal, we try to replace it. - final NotificationTask newTask = new NotificationTask(key, it); - if (listenerCache.replace(key, task, newTask)) { - runTask(listener, newTask); - break; - } - - // We failed to replace the task, hence we need retry. Note we have to recover the items to be - // published from the new task. - it = newTask.recoverItems(); - LOG.debug("{}: retrying task queueing for {}", name, listener); - continue; - } - - // All notifications have either been delivered or we have timed out and warned about the ones we - // have failed to deliver. In any case we are done here. - break; - } - } catch (InterruptedException e) { - // We were interrupted trying to offer to the listener's queue. Somebody's probably - // telling us to quit. - LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener); - } - - LOG.trace("{}: submitNotifications done for listener {}", name, listener); - } - - /** - * Returns {@link ListenerNotificationQueueStats} instances for each current listener - * notification task in progress. - */ - public List getListenerNotificationQueueStats() { - return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(), - t.size())).collect(Collectors.toList()); - } - - private void runTask(final L listener, final NotificationTask task) { - LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener); - executor.execute(task); - } - - /** - * Executor task for a single listener that queues notifications and sends them serially to the - * listener. - */ - private class NotificationTask implements Runnable { - private final Lock lock = new ReentrantLock(); - private final Condition notEmpty = lock.newCondition(); - private final Condition notFull = lock.newCondition(); - private final @NonNull ForwardingIdentityObject listenerKey; - - @GuardedBy("lock") - private final Queue queue = new ArrayDeque<>(); - @GuardedBy("lock") - private boolean exiting; - - NotificationTask(final @NonNull ForwardingIdentityObject listenerKey, - final @NonNull Iterator notifications) { - this.listenerKey = requireNonNull(listenerKey); - while (notifications.hasNext()) { - queue.add(notifications.next()); - } - } - - @NonNull Iterator recoverItems() { - // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never - // get started, hence this is safe. - return queue.iterator(); - } - - int size() { - lock.lock(); - try { - return queue.size(); - } finally { - lock.unlock(); - } - } - - boolean submitNotifications(final @NonNull Iterator notifications) throws InterruptedException { - final long start = System.nanoTime(); - final long deadline = start + GIVE_UP_NANOS; - - lock.lock(); - try { - // Lock may have blocked for some time, we need to take that into account. We may have exceedded - // the deadline, but that is unlikely and even in that case we can make some progress without further - // blocking. - long canWait = deadline - System.nanoTime(); - - while (true) { - // Check the exiting flag - if true then #run is in the process of exiting so return - // false to indicate such. Otherwise, offer the notifications to the queue. - if (exiting) { - return false; - } - - final int avail = maxQueueCapacity - queue.size(); - if (avail <= 0) { - if (canWait <= 0) { - LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded" - + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable" - + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications), - listenerKey, MAX_NOTIFICATION_OFFER_MINUTES); - return true; - } - - canWait = notFull.awaitNanos(canWait); - continue; - } - - for (int i = 0; i < avail; ++i) { - if (!notifications.hasNext()) { - notEmpty.signal(); - return true; - } - - queue.add(notifications.next()); - } - } - } finally { - lock.unlock(); - } - } - - @GuardedBy("lock") - private boolean waitForQueue() { - long timeout = TASK_WAIT_NANOS; - - while (queue.isEmpty()) { - if (timeout <= 0) { - return false; - } - - try { - timeout = notEmpty.awaitNanos(timeout); - } catch (InterruptedException e) { - // The executor is probably shutting down so log as debug. - LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey); - return false; - } - } - - return true; - } - - @Override - public void run() { - try { - // Loop until we've dispatched all the notifications in the queue. - while (true) { - final @NonNull ImmutableList notifications; - - lock.lock(); - try { - if (!waitForQueue()) { - exiting = true; - break; - } - - // Splice the entire queue - notifications = ImmutableList.copyOf(queue); - queue.clear(); - - notFull.signalAll(); - } finally { - lock.unlock(); - } - - invokeListener(notifications); - } - } finally { - // We're exiting, gracefully or not - either way make sure we always remove - // ourselves from the cache. - listenerCache.remove(listenerKey, this); - } - } - - @SuppressWarnings("checkstyle:illegalCatch") - private void invokeListener(final @NonNull ImmutableList notifications) { - LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications); - try { - listenerInvoker.invokeListener(listenerKey.getDelegate(), notifications); - } catch (Exception e) { - // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications. - LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e); - } - } - } } diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerMXBeanImpl.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerMXBeanImpl.java index c9a2c2db6c..309a84e609 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerMXBeanImpl.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerMXBeanImpl.java @@ -12,9 +12,9 @@ import static java.util.Objects.requireNonNull; import java.util.List; final class QueuedNotificationManagerMXBeanImpl implements QueuedNotificationManagerMXBean { - private final QueuedNotificationManager manager; + private final AbstractQueuedNotificationManager manager; - QueuedNotificationManagerMXBeanImpl(final QueuedNotificationManager manager) { + QueuedNotificationManagerMXBeanImpl(final AbstractQueuedNotificationManager manager) { this.manager = requireNonNull(manager); } -- 2.36.6