Separate out {Identity,Equality}QueuedNotificationManager 35/85735/2
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 16 Aug 2019 16:00:47 +0000 (18:00 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sat, 9 Nov 2019 03:53:11 +0000 (04:53 +0100)
There are use cases where we would like to use QNM on equality, i.e.
we do not realy have a listener instance, but rather a key to somewhere
else. That other place operates on equality, which makes it incompatible
with the current semantics.

This splits out the baseline queueing logic in from NotificationManager
interface, so that it can be reused to implement different execution
concepts.

QueuedNotificationManager is realized on top of it, so that the baseline
interface is implemented in AbstractQueuedNotificationManager and then
specialized to different lookups.

JIRA: YANGTOOLS-1038
Change-Id: I1123d21664a20ff380ccb7db7096801915491f6e
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 807d0868fa0e9b81332ea12d8fe19f81f88efc08)

common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractBatchingExecutor.java [new file with mode: 0644]
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractQueuedNotificationManager.java [new file with mode: 0644]
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/EqualityQueuedNotificationManager.java [new file with mode: 0644]
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/IdentityQueuedNotificationManager.java [new file with mode: 0644]
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/NotificationManager.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerMXBeanImpl.java

diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractBatchingExecutor.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractBatchingExecutor.java
new file mode 100644 (file)
index 0000000..7b40ea3
--- /dev/null
@@ -0,0 +1,312 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Stream;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yangtools.concepts.AbstractIdentifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages queuing and dispatching tasks for multiple workers concurrently. Tasks are queued on a per-worker
+ * basis and dispatched serially to each worker via an {@link Executor}.
+ *
+ * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
+ * task for a worker when there are pending tasks. On the first task(s), a queue is created and a dispatcher task is
+ * submitted to the executor to dispatch the queue to the associated worker. Any subsequent tasks that occur before all
+ * previous tasks have been dispatched are appended to the existing queue. When all tasks have been dispatched,
+ * the queue and dispatcher task are discarded.
+ *
+ * @author Thomas Pantelis
+ * @author Robert Varga
+ *
+ * @param <K> worker key type
+ * @param <T> task type
+ */
+abstract class AbstractBatchingExecutor<K, T> extends AbstractIdentifiable<String> {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractBatchingExecutor.class);
+
+    /**
+     * Caps the maximum number of attempts to offer a task to a particular worker. Each attempt window is 1 minute, so
+     * an offer times out after roughly 10 minutes.
+     */
+    private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
+    private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
+    private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
+
+    private final ConcurrentMap<K, DispatcherTask> dispatcherTasks = new ConcurrentHashMap<>();
+    private final @NonNull Executor executor;
+    private final int maxQueueCapacity;
+
+    AbstractBatchingExecutor(final @NonNull String name, final @NonNull Executor executor, final int maxQueueCapacity) {
+        super(name);
+        this.executor = requireNonNull(executor);
+        checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
+        this.maxQueueCapacity = maxQueueCapacity;
+    }
+
+    /**
+     * Returns the maximum worker queue capacity.
+     */
+    final int maxQueueCapacity() {
+        return maxQueueCapacity;
+    }
+
+    /**
+     * Returns the {@link Executor} to used for dispatcher tasks.
+     */
+    final @NonNull Executor executor() {
+        return executor;
+    }
+
+    // FIXME: YANGTOOLS-1016: allow explicit blocking control
+    final void submitTask(final K key, final T task) {
+        submitTasks(key, Collections.singletonList(requireNonNull(task)));
+    }
+
+    // FIXME: YANGTOOLS-1016: allow explicit blocking control with return of un-enqueued tasks (or removal from input)
+    final void submitTasks(final K key, final Iterable<T> tasks) {
+        if (tasks == null || key == null) {
+            return;
+        }
+
+        LOG.trace("{}: submitTasks for worker {}: {}", getIdentifier(), key, tasks);
+
+        // Keep looping until we are either able to add a new DispatcherTask or are able to add our tasks to an existing
+        // DispatcherTask. Eventually one or the other will occur.
+        try {
+            Iterator<T> it = tasks.iterator();
+
+            while (true) {
+                DispatcherTask task = dispatcherTasks.get(key);
+                if (task == null) {
+                    // No task found, try to insert a new one
+                    final DispatcherTask newTask = new DispatcherTask(key, it);
+                    task = dispatcherTasks.putIfAbsent(key, newTask);
+                    if (task == null) {
+                        // We were able to put our new task - now submit it to the executor and we're done. If it throws
+                        // a RejectedExecutionException, let that propagate to the caller.
+                        runTask(key, newTask);
+                        break;
+                    }
+
+                    // We have a racing task, hence we can continue, but we need to refresh our iterator from the task.
+                    it = newTask.recoverItems();
+                }
+
+                final boolean completed = task.submitTasks(it);
+                if (!completed) {
+                    // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
+                    // than spinning on removal, we try to replace it.
+                    final DispatcherTask newTask = new DispatcherTask(key, it);
+                    if (dispatcherTasks.replace(key, task, newTask)) {
+                        runTask(key, newTask);
+                        break;
+                    }
+
+                    // We failed to replace the task, hence we need retry. Note we have to recover the items to be
+                    // published from the new task.
+                    it = newTask.recoverItems();
+                    LOG.debug("{}: retrying task queueing for {}", getIdentifier(), key);
+                    continue;
+                }
+
+                // All tasks have either been delivered or we have timed out and warned about the ones we have failed
+                // to deliver. In any case we are done here.
+                break;
+            }
+        } catch (InterruptedException e) {
+            // We were interrupted trying to offer to the worker's queue. Somebody's probably telling us to quit.
+            LOG.warn("{}: Interrupted trying to add to {} worker's queue", getIdentifier(), key);
+        }
+
+        LOG.trace("{}: submitTasks done for worker {}", getIdentifier(), key);
+    }
+
+    final Stream<DispatcherTask> streamTasks() {
+        return dispatcherTasks.values().stream();
+    }
+
+    abstract void executeBatch(K key, @NonNull ImmutableList<T> tasks) throws Exception;
+
+    private void runTask(final K key, final DispatcherTask task) {
+        LOG.debug("{}: Submitting DispatcherTask for worker {}", getIdentifier(), key);
+        executor.execute(task);
+    }
+
+    /**
+     * Executor task for a single worker that queues tasks and sends them serially to the worker.
+     */
+    final class DispatcherTask implements Runnable {
+        private final Lock lock = new ReentrantLock();
+        private final Condition notEmpty = lock.newCondition();
+        private final Condition notFull = lock.newCondition();
+        private final @NonNull K key;
+
+        @GuardedBy("lock")
+        private final Queue<T> queue = new ArrayDeque<>();
+        @GuardedBy("lock")
+        private boolean exiting;
+
+        DispatcherTask(final @NonNull K key, final @NonNull Iterator<T> tasks) {
+            this.key = requireNonNull(key);
+            while (tasks.hasNext()) {
+                final T task = tasks.next();
+                if (task != null) {
+                    queue.add(task);
+                }
+            }
+        }
+
+        @NonNull Iterator<T> recoverItems() {
+            // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
+            // get started, hence this is safe.
+            return queue.iterator();
+        }
+
+        @NonNull K key() {
+            return key;
+        }
+
+        int size() {
+            lock.lock();
+            try {
+                return queue.size();
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        boolean submitTasks(final @NonNull Iterator<T> tasks) throws InterruptedException {
+            final long start = System.nanoTime();
+            final long deadline = start + GIVE_UP_NANOS;
+
+            lock.lock();
+            try {
+                // Lock may have blocked for some time, we need to take that into account. We may have exceeded
+                // the deadline, but that is unlikely and even in that case we can make some progress without further
+                // blocking.
+                long canWait = deadline - System.nanoTime();
+
+                while (true) {
+                    // Check the exiting flag - if true then #run is in the process of exiting so return false
+                    // to indicate such. Otherwise, offer the tasks to the queue.
+                    if (exiting) {
+                        return false;
+                    }
+
+                    final int avail = maxQueueCapacity - queue.size();
+                    if (avail <= 0) {
+                        if (canWait <= 0) {
+                            LOG.warn("{}: Failed to offer tasks {} to the queue for worker {}. Exceeded "
+                                + "maximum allowable time of {} minutes; the worker is likely in an unrecoverable "
+                                + "state (deadlock or endless loop). ", getIdentifier(), ImmutableList.copyOf(tasks),
+                                key, MAX_NOTIFICATION_OFFER_MINUTES);
+                            return true;
+                        }
+
+                        canWait = notFull.awaitNanos(canWait);
+                        continue;
+                    }
+
+                    for (int i = 0; i < avail; ++i) {
+                        if (!tasks.hasNext()) {
+                            notEmpty.signal();
+                            return true;
+                        }
+
+                        queue.add(tasks.next());
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
+
+        @GuardedBy("lock")
+        private boolean waitForQueue() {
+            long timeout = TASK_WAIT_NANOS;
+
+            while (queue.isEmpty()) {
+                if (timeout <= 0) {
+                    return false;
+                }
+
+                try {
+                    timeout = notEmpty.awaitNanos(timeout);
+                } catch (InterruptedException e) {
+                    // The executor is probably shutting down so log as debug.
+                    LOG.debug("{}: Interrupted trying to remove from {} worker's queue", getIdentifier(), key);
+                    return false;
+                }
+            }
+
+            return true;
+        }
+
+        @Override
+        public void run() {
+            try {
+                // Loop until we've dispatched all the tasks in the queue.
+                while (true) {
+                    final @NonNull ImmutableList<T> tasks;
+
+                    lock.lock();
+                    try {
+                        if (!waitForQueue()) {
+                            exiting = true;
+                            break;
+                        }
+
+                        // Splice the entire queue
+                        tasks = ImmutableList.copyOf(queue);
+                        queue.clear();
+
+                        notFull.signalAll();
+                    } finally {
+                        lock.unlock();
+                    }
+
+                    invokeWorker(tasks);
+                }
+            } finally {
+                // We're exiting, gracefully or not - either way make sure we always remove
+                // ourselves from the cache.
+                dispatcherTasks.remove(key, this);
+            }
+        }
+
+        @SuppressWarnings("checkstyle:illegalCatch")
+        private void invokeWorker(final @NonNull ImmutableList<T> tasks) {
+            LOG.debug("{}: Invoking worker {} with tasks: {}", getIdentifier(), key, tasks);
+            try {
+                executeBatch(key, tasks);
+            } catch (Exception e) {
+                // We'll let a RuntimeException from the worker slide and keep sending any remaining tasks.
+                LOG.error("{}: Error invoking worker {} with {}", getIdentifier(), key, tasks, e);
+            }
+        }
+    }
+}
diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractQueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AbstractQueuedNotificationManager.java
new file mode 100644 (file)
index 0000000..50f7837
--- /dev/null
@@ -0,0 +1,106 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+import org.eclipse.jdt.annotation.NonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
+
+/**
+ * This class manages queuing and dispatching notifications for multiple listeners concurrently.
+ * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
+ * {@link Executor}.
+ *
+ * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
+ * task for a listener when there are pending notifications. On the first notification(s), a queue
+ * is created and a task is submitted to the executor to dispatch the queue to the associated
+ * listener. Any subsequent notifications that occur before all previous notifications have been
+ * dispatched are appended to the existing queue. When all notifications have been dispatched, the
+ * queue and task are discarded.
+ *
+ * @author Thomas Pantelis
+ *
+ * @param <L> the listener type
+ * @param <N> the notification type
+ */
+@NonNullByDefault
+abstract class AbstractQueuedNotificationManager<T, L, N> extends AbstractBatchingExecutor<T, N>
+        implements NotificationManager<L, N> {
+
+    private final QueuedNotificationManagerMXBean mxBean = new QueuedNotificationManagerMXBeanImpl(this);
+    private final BatchedInvoker<L, N> listenerInvoker;
+
+    AbstractQueuedNotificationManager(final String name, final Executor executor, final int maxQueueCapacity,
+            final BatchedInvoker<L, N> listenerInvoker) {
+        super(name, executor, maxQueueCapacity);
+        this.listenerInvoker = requireNonNull(listenerInvoker);
+    }
+
+    /**
+     * Returns the {@link Executor} to used for notification tasks.
+     */
+    public final Executor getExecutor() {
+        return executor();
+    }
+
+    /**
+     * Returns the maximum listener queue capacity.
+     */
+    public final int getMaxQueueCapacity() {
+        return maxQueueCapacity();
+    }
+
+    /**
+     * Return an {@link QueuedNotificationManagerMXBean} tied to this instance.
+     *
+     * @return An QueuedNotificationManagerMXBean object.
+     */
+    public final QueuedNotificationManagerMXBean getMXBean() {
+        return mxBean;
+    }
+
+    /**
+     * Returns {@link ListenerNotificationQueueStats} instances for each current listener
+     * notification task in progress.
+     */
+    // FIXME: drop visibility to package-protected
+    public final List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
+        return streamTasks().map(t -> new ListenerNotificationQueueStats(t.key().toString(), t.size()))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public final void submitNotification(final L listener, final N notification) {
+        if (listener != null && notification != null) {
+            submitTask(wrap(listener), notification);
+        }
+    }
+
+    @Override
+    public final void submitNotifications(final L listener, final @Nullable Iterable<N> notifications) {
+        if (listener != null && notifications != null) {
+            submitTasks(wrap(listener), notifications);
+        }
+    }
+
+    @Override
+    final void executeBatch(final T key, final @NonNull ImmutableList<N> tasks) {
+        listenerInvoker.invokeListener(unwrap(key), tasks);
+    }
+
+    abstract T wrap(L listener);
+
+    abstract L unwrap(T key);
+}
diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/EqualityQueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/EqualityQueuedNotificationManager.java
new file mode 100644 (file)
index 0000000..6531c5e
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import com.google.common.annotations.Beta;
+import java.util.concurrent.Executor;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
+
+@Beta
+@NonNullByDefault
+public final class EqualityQueuedNotificationManager<L, N> extends AbstractQueuedNotificationManager<L, L, N> {
+    public EqualityQueuedNotificationManager(final String name, final Executor executor, final int maxQueueCapacity,
+            final BatchedInvoker<L, N> listenerInvoker) {
+        super(name, executor, maxQueueCapacity, listenerInvoker);
+    }
+
+    @Override
+    L wrap(final L listener) {
+        return listener;
+    }
+
+    @Override
+    L unwrap(final L key) {
+        return key;
+    }
+}
diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/IdentityQueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/IdentityQueuedNotificationManager.java
new file mode 100644 (file)
index 0000000..c228680
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import com.google.common.annotations.Beta;
+import java.util.concurrent.Executor;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.opendaylight.yangtools.util.ForwardingIdentityObject;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
+
+@Beta
+@NonNullByDefault
+public class IdentityQueuedNotificationManager<L, N>
+        extends AbstractQueuedNotificationManager<ForwardingIdentityObject<L>, L, N> {
+    public IdentityQueuedNotificationManager(final String name, final Executor executor, final int maxQueueCapacity,
+        final BatchedInvoker<L, N> listenerInvoker) {
+        super(name, executor, maxQueueCapacity, listenerInvoker);
+    }
+
+    @Override
+    final ForwardingIdentityObject<L> wrap(final L listener) {
+        return ForwardingIdentityObject.of(listener);
+    }
+
+    @Override
+    final L unwrap(final ForwardingIdentityObject<L> key) {
+        return key.getDelegate();
+    }
+}
index 531adfcb812fb373d35876a0502750c18453e545..3c21d65a6c80ccbe6c72ff819e4da5bbc33c1015 100644 (file)
@@ -41,5 +41,4 @@ public interface NotificationManager<L, N> {
      * @throws RejectedExecutionException if a notification can't be queued for dispatching
      */
     void submitNotifications(L listener, Iterable<N> notifications);
-
 }
index dd0bcb7c1ef3ebf0dba162a3e7f53bba691ea6ac..8839099af0df16d34904a11b34b7613d71fc7717 100644 (file)
@@ -7,47 +7,23 @@
  */
 package org.opendaylight.yangtools.util.concurrent;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
 import com.google.common.collect.ImmutableList;
-import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.yangtools.util.ForwardingIdentityObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * This class manages queuing and dispatching notifications for multiple listeners concurrently.
- * Notifications are queued on a per-listener basis and dispatched serially to each listener via an
- * {@link Executor}.
+ * {@inheritDoc}
  *
- * <p>This class optimizes its memory footprint by only allocating and maintaining a queue and executor
- * task for a listener when there are pending notifications. On the first notification(s), a queue
- * is created and a task is submitted to the executor to dispatch the queue to the associated
- * listener. Any subsequent notifications that occur before all previous notifications have been
- * dispatched are appended to the existing queue. When all notifications have been dispatched, the
- * queue and task are discarded.
+ * <p>
+ * This class is pessimistic about listener type and uses identity mapping for comparing them. This is defensive versus
+ * reused objects, maintaining semantics. This may not always be intended, for example if {@code L} is a {@code String}
+ * which is being dynamically determined. In that case we do not want to use identity, but equality, as otherwise
+ * the caller is forced to use {@link String#intern()} -- leading to interning in lookup, which is absolutely
+ * unnecessary. In such use cases, use {@link EqualityQueuedNotificationManager} instead.
  *
  * @author Thomas Pantelis
- *
- * @param <L> the listener type
- * @param <N> the notification type
  */
-public final class QueuedNotificationManager<L, N> implements NotificationManager<L, N> {
+public final class QueuedNotificationManager<L, N> extends IdentityQueuedNotificationManager<L, N> {
     @FunctionalInterface
     public interface BatchedInvoker<L, N> {
         /**
@@ -59,39 +35,9 @@ public final class QueuedNotificationManager<L, N> implements NotificationManage
         void invokeListener(@NonNull L listener, @NonNull ImmutableList<N> notifications);
     }
 
-    private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManager.class);
-
-    /**
-     * Caps the maximum number of attempts to offer notification to a particular listener.  Each
-     * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
-     */
-    private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
-    private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
-    private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
-
-    /**
-     * We key by listener reference identity hashCode/equals.
-     * Since we don't know anything about the listener class implementations and we're mixing
-     * multiple listener class instances in the same map, this avoids any potential issue with an
-     * equals implementation that just blindly casts the other Object to compare instead of checking
-     * for instanceof.
-     */
-    private final ConcurrentMap<ForwardingIdentityObject<L>, NotificationTask> listenerCache =
-            new ConcurrentHashMap<>();
-    private final @NonNull QueuedNotificationManagerMXBean mxBean = new QueuedNotificationManagerMXBeanImpl(this);
-    private final @NonNull BatchedInvoker<L, N> listenerInvoker;
-    private final @NonNull Executor executor;
-    private final @NonNull String name;
-    private final int maxQueueCapacity;
-
-    private QueuedNotificationManager(final @NonNull Executor executor,
-            final @NonNull BatchedInvoker<L, N> listenerInvoker, final int maxQueueCapacity,
-            final @NonNull String name) {
-        checkArgument(maxQueueCapacity > 0, "Invalid maxQueueCapacity %s must be > 0", maxQueueCapacity);
-        this.executor = requireNonNull(executor);
-        this.listenerInvoker = requireNonNull(listenerInvoker);
-        this.maxQueueCapacity = maxQueueCapacity;
-        this.name = requireNonNull(name);
+    QueuedNotificationManager(final @NonNull Executor executor, final @NonNull BatchedInvoker<L, N> listenerInvoker,
+            final int maxQueueCapacity, final @NonNull String name) {
+        super(name, executor, maxQueueCapacity, listenerInvoker);
     }
 
     /**
@@ -107,269 +53,4 @@ public final class QueuedNotificationManager<L, N> implements NotificationManage
             final @NonNull String name) {
         return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
     }
-
-    /**
-     * Returns the maximum listener queue capacity.
-     */
-    public int getMaxQueueCapacity() {
-        return maxQueueCapacity;
-    }
-
-    /**
-     * Return an {@link QueuedNotificationManagerMXBean} tied to this instance.
-     *
-     * @return An QueuedNotificationManagerMXBean object.
-     */
-    public @NonNull QueuedNotificationManagerMXBean getMXBean() {
-        return mxBean;
-    }
-
-    /**
-     * Returns the {@link Executor} to used for notification tasks.
-     */
-    public @NonNull Executor getExecutor() {
-        return executor;
-    }
-
-    /* (non-Javadoc)
-     * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
-     */
-    @Override
-    public void submitNotification(final L listener, final N notification) {
-        if (notification != null) {
-            submitNotifications(listener, Collections.singletonList(notification));
-        }
-    }
-
-    /* (non-Javadoc)
-     * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#submitNotifications(L, java.util.Collection)
-     */
-    @Override
-    public void submitNotifications(final L listener, final Iterable<N> notifications) {
-
-        if (notifications == null || listener == null) {
-            return;
-        }
-
-        LOG.trace("{}: submitNotifications for listener {}: {}", name, listener, notifications);
-
-        final ForwardingIdentityObject<L> key = ForwardingIdentityObject.of(listener);
-
-        // Keep looping until we are either able to add a new NotificationTask or are able to
-        // add our notifications to an existing NotificationTask. Eventually one or the other
-        // will occur.
-        try {
-            Iterator<N> it = notifications.iterator();
-
-            while (true) {
-                NotificationTask task = listenerCache.get(key);
-                if (task == null) {
-                    // No task found, try to insert a new one
-                    final NotificationTask newTask = new NotificationTask(key, it);
-                    task = listenerCache.putIfAbsent(key, newTask);
-                    if (task == null) {
-                        // We were able to put our new task - now submit it to the executor and
-                        // we're done. If it throws a RejectedExecutionException, let that propagate
-                        // to the caller.
-                        runTask(listener, newTask);
-                        break;
-                    }
-
-                    // We have a racing task, hence we can continue, but we need to refresh our iterator from
-                    // the task.
-                    it = newTask.recoverItems();
-                }
-
-                final boolean completed = task.submitNotifications(it);
-                if (!completed) {
-                    // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
-                    // than spinning on removal, we try to replace it.
-                    final NotificationTask newTask = new NotificationTask(key, it);
-                    if (listenerCache.replace(key, task, newTask)) {
-                        runTask(listener, newTask);
-                        break;
-                    }
-
-                    // We failed to replace the task, hence we need retry. Note we have to recover the items to be
-                    // published from the new task.
-                    it = newTask.recoverItems();
-                    LOG.debug("{}: retrying task queueing for {}", name, listener);
-                    continue;
-                }
-
-                // All notifications have either been delivered or we have timed out and warned about the ones we
-                // have failed to deliver. In any case we are done here.
-                break;
-            }
-        } catch (InterruptedException e) {
-            // We were interrupted trying to offer to the listener's queue. Somebody's probably
-            // telling us to quit.
-            LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
-        }
-
-        LOG.trace("{}: submitNotifications done for listener {}", name, listener);
-    }
-
-    /**
-     * Returns {@link ListenerNotificationQueueStats} instances for each current listener
-     * notification task in progress.
-     */
-    public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
-        return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
-            t.size())).collect(Collectors.toList());
-    }
-
-    private void runTask(final L listener, final NotificationTask task) {
-        LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
-        executor.execute(task);
-    }
-
-    /**
-     * Executor task for a single listener that queues notifications and sends them serially to the
-     * listener.
-     */
-    private class NotificationTask implements Runnable {
-        private final Lock lock = new ReentrantLock();
-        private final Condition notEmpty = lock.newCondition();
-        private final Condition notFull = lock.newCondition();
-        private final @NonNull ForwardingIdentityObject<L> listenerKey;
-
-        @GuardedBy("lock")
-        private final Queue<N> queue = new ArrayDeque<>();
-        @GuardedBy("lock")
-        private boolean exiting;
-
-        NotificationTask(final @NonNull ForwardingIdentityObject<L> listenerKey,
-                final @NonNull Iterator<N> notifications) {
-            this.listenerKey = requireNonNull(listenerKey);
-            while (notifications.hasNext()) {
-                queue.add(notifications.next());
-            }
-        }
-
-        @NonNull Iterator<N> recoverItems() {
-            // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
-            // get started, hence this is safe.
-            return queue.iterator();
-        }
-
-        int size() {
-            lock.lock();
-            try {
-                return queue.size();
-            } finally {
-                lock.unlock();
-            }
-        }
-
-        boolean submitNotifications(final @NonNull Iterator<N> notifications) throws InterruptedException {
-            final long start = System.nanoTime();
-            final long deadline = start + GIVE_UP_NANOS;
-
-            lock.lock();
-            try {
-                // Lock may have blocked for some time, we need to take that into account. We may have exceedded
-                // the deadline, but that is unlikely and even in that case we can make some progress without further
-                // blocking.
-                long canWait = deadline - System.nanoTime();
-
-                while (true) {
-                    // Check the exiting flag - if true then #run is in the process of exiting so return
-                    // false to indicate such. Otherwise, offer the notifications to the queue.
-                    if (exiting) {
-                        return false;
-                    }
-
-                    final int avail = maxQueueCapacity - queue.size();
-                    if (avail <= 0) {
-                        if (canWait <= 0) {
-                            LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
-                                + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
-                                + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
-                                listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
-                            return true;
-                        }
-
-                        canWait = notFull.awaitNanos(canWait);
-                        continue;
-                    }
-
-                    for (int i = 0; i < avail; ++i) {
-                        if (!notifications.hasNext()) {
-                            notEmpty.signal();
-                            return true;
-                        }
-
-                        queue.add(notifications.next());
-                    }
-                }
-            } finally {
-                lock.unlock();
-            }
-        }
-
-        @GuardedBy("lock")
-        private boolean waitForQueue() {
-            long timeout = TASK_WAIT_NANOS;
-
-            while (queue.isEmpty()) {
-                if (timeout <= 0) {
-                    return false;
-                }
-
-                try {
-                    timeout = notEmpty.awaitNanos(timeout);
-                } catch (InterruptedException e) {
-                    // The executor is probably shutting down so log as debug.
-                    LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
-                    return false;
-                }
-            }
-
-            return true;
-        }
-
-        @Override
-        public void run() {
-            try {
-                // Loop until we've dispatched all the notifications in the queue.
-                while (true) {
-                    final @NonNull ImmutableList<N> notifications;
-
-                    lock.lock();
-                    try {
-                        if (!waitForQueue()) {
-                            exiting = true;
-                            break;
-                        }
-
-                        // Splice the entire queue
-                        notifications = ImmutableList.copyOf(queue);
-                        queue.clear();
-
-                        notFull.signalAll();
-                    } finally {
-                        lock.unlock();
-                    }
-
-                    invokeListener(notifications);
-                }
-            } finally {
-                // We're exiting, gracefully or not - either way make sure we always remove
-                // ourselves from the cache.
-                listenerCache.remove(listenerKey, this);
-            }
-        }
-
-        @SuppressWarnings("checkstyle:illegalCatch")
-        private void invokeListener(final @NonNull ImmutableList<N> notifications) {
-            LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
-            try {
-                listenerInvoker.invokeListener(listenerKey.getDelegate(), notifications);
-            } catch (Exception e) {
-                // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
-                LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
-            }
-        }
-    }
 }
index c9a2c2db6c87d0c55368052a83a8ccdb6bb209fb..309a84e60961a18228965e564f649f9b05c4a96a 100644 (file)
@@ -12,9 +12,9 @@ import static java.util.Objects.requireNonNull;
 import java.util.List;
 
 final class QueuedNotificationManagerMXBeanImpl implements QueuedNotificationManagerMXBean {
-    private final QueuedNotificationManager<?, ?> manager;
+    private final AbstractQueuedNotificationManager<?, ?, ?> manager;
 
-    QueuedNotificationManagerMXBeanImpl(final QueuedNotificationManager<?, ?> manager) {
+    QueuedNotificationManagerMXBeanImpl(final AbstractQueuedNotificationManager<?, ?, ?> manager) {
         this.manager = requireNonNull(manager);
     }