Allow QueuedNotificationManager to batch notifications 94/46094/7
authorRobert Varga <rovarga@cisco.com>
Wed, 21 Sep 2016 15:29:57 +0000 (17:29 +0200)
committerRobert Varga <rovarga@cisco.com>
Fri, 23 Sep 2016 09:27:08 +0000 (11:27 +0200)
This patch reworks the queueing logic so that notifications
get completely read from the queue and then propagated to the
listener invoker in one go.

Change-Id: I1a4caaa75f7a206abf4ff5b6829918bd25d79551
Signed-off-by: Robert Varga <rovarga@cisco.com>
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java
common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java

index 21a63ca2532b6295bdd115194e3a6af2024af55e..5160efe9bf7ccd785a9455335973605906999f61 100644 (file)
@@ -10,16 +10,18 @@ package org.opendaylight.yangtools.util.concurrent;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
@@ -86,7 +88,9 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
      * Caps the maximum number of attempts to offer notification to a particular listener.  Each
      * attempt window is 1 minute, so an offer times out after roughly 10 minutes.
      */
-    private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
+    private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
+    private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
+    private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
 
     private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
     private final BatchedInvoker<L, N> listenerInvoker;
@@ -116,8 +120,14 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
     @Deprecated
     public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
             final int maxQueueCapacity, final String name) {
-        this(executor, (BatchedInvoker<L, N>)(l, c) -> c.forEach(n -> listenerInvoker.invokeListener(l, n)),
-            maxQueueCapacity, name);
+        this(executor, (BatchedInvoker<L, N>)(l, c) -> c.forEach(n -> {
+            try {
+                listenerInvoker.invokeListener(l, n);
+            } catch (Exception e) {
+                LOG.error("{}: Error notifying listener {} with {}", name, l, n, e);
+            }
+
+        }), maxQueueCapacity, name);
         Preconditions.checkNotNull(listenerInvoker);
     }
 
@@ -134,6 +144,20 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
         return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
     }
 
+    /**
+     * Returns the maximum listener queue capacity.
+     */
+    public int getMaxQueueCapacity() {
+        return maxQueueCapacity;
+    }
+
+    /**
+     * Returns the {@link Executor} to used for notification tasks.
+     */
+    public Executor getExecutor() {
+        return executor;
+    }
+
     /* (non-Javadoc)
      * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
      */
@@ -163,38 +187,47 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
         // add our notifications to an existing NotificationTask. Eventually one or the other
         // will occur.
         try {
-            NotificationTask newNotificationTask = null;
+            Iterator<N> it = notifications.iterator();
 
             while (true) {
-                final NotificationTask existingTask = listenerCache.get(key);
-                if (existingTask != null && existingTask.submitNotifications(notifications)) {
-                    // We were able to add our notifications to an existing task so we're done.
-                    break;
+                NotificationTask task = listenerCache.get(key);
+                if (task == null) {
+                    // No task found, try to insert a new one
+                    final NotificationTask newTask = new NotificationTask(key, it);
+                    task = listenerCache.putIfAbsent(key, newTask);
+                    if (task == null) {
+                        // We were able to put our new task - now submit it to the executor and
+                        // we're done. If it throws a RejectedExecutionException, let that propagate
+                        // to the caller.
+                        runTask(listener, newTask);
+                        break;
+                    }
+
+                    // We have a racing task, hence we can continue, but we need to refresh our iterator from
+                    // the task.
+                    it = newTask.recoverItems();
                 }
 
-                // Either there's no existing task or we couldn't add our notifications to the
-                // existing one because it's in the process of exiting and removing itself from
-                // the cache. Either way try to put a new task in the cache. If we can't put
-                // then either the existing one is still there and hasn't removed itself quite
-                // yet or some other concurrent thread beat us to the put although this method
-                // shouldn't be called concurrently for the same listener as that would violate
-                // notification ordering. In any case loop back up and try again.
+                final boolean completed = task.submitNotifications(it);
+                if (!completed) {
+                    // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
+                    // than spinning on removal, we try to replace it.
+                    final NotificationTask newTask = new NotificationTask(key, it);
+                    if (listenerCache.replace(key, task, newTask)) {
+                        runTask(listener, newTask);
+                        break;
+                    }
 
-                if (newNotificationTask == null) {
-                    newNotificationTask = new NotificationTask(key, notifications);
-                }
-                final NotificationTask oldTask = listenerCache.putIfAbsent(key, newNotificationTask);
-                if (oldTask == null) {
-                    // We were able to put our new task - now submit it to the executor and
-                    // we're done. If it throws a RejectedxecutionException, let that propagate
-                    // to the caller.
-
-                    LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
-                    executor.execute(newNotificationTask);
-                    break;
+                    // We failed to replace the task, hence we need retry. Note we have to recover the items to be
+                    // published from the new task.
+                    it = newTask.recoverItems();
+                    LOG.debug("{}: retrying task queueing for {}", name, listener);
+                    continue;
                 }
 
-                LOG.debug("{}: retrying task queueing for {}", name, listener);
+                // All notifications have either been delivered or we have timed out and warned about the ones we
+                // have failed to deliver. In any case we are done here.
+                break;
             }
         } catch (InterruptedException e) {
             // We were interrupted trying to offer to the listener's queue. Somebody's probably
@@ -202,7 +235,7 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
             LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
         }
 
-        LOG.trace("{}: submitNotifications dine for listener {}", name, listener);
+        LOG.trace("{}: submitNotifications done for listener {}", name, listener);
     }
 
     /**
@@ -211,21 +244,12 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
      */
     public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
         return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
-            t.notificationQueue.size())).collect(Collectors.toList());
+            t.size())).collect(Collectors.toList());
     }
 
-    /**
-     * Returns the maximum listener queue capacity.
-     */
-    public int getMaxQueueCapacity() {
-        return maxQueueCapacity;
-    }
-
-    /**
-     * Returns the {@link Executor} to used for notification tasks.
-     */
-    public Executor getExecutor() {
-        return executor;
+    private void runTask(final L listener, final NotificationTask task) {
+        LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
+        executor.execute(task);
     }
 
     /**
@@ -270,70 +294,101 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
      * listener.
      */
     private class NotificationTask implements Runnable {
-        private final Lock queuingLock = new ReentrantLock();
-        private final BlockingQueue<N> notificationQueue;
+
+        private final Lock lock = new ReentrantLock();
+        private final Condition notEmpty = lock.newCondition();
+        private final Condition notFull = lock.newCondition();
         private final ListenerKey<L> listenerKey;
 
-        @GuardedBy("queuingLock")
-        private boolean queuedNotifications = false;
-        private volatile boolean done = false;
+        @GuardedBy("lock")
+        private final Queue<N> queue = new ArrayDeque<>();
+        @GuardedBy("lock")
+        private boolean exiting;
 
-        NotificationTask(final ListenerKey<L> listenerKey, final Iterable<N> notifications) {
+        NotificationTask(final ListenerKey<L> listenerKey, final Iterator<N> notifications) {
             this.listenerKey = Preconditions.checkNotNull(listenerKey);
-            this.notificationQueue = new LinkedBlockingQueue<>(maxQueueCapacity);
+            while (notifications.hasNext()) {
+                queue.offer(notifications.next());
+            }
+        }
 
-            for (N notification: notifications) {
-                this.notificationQueue.add(notification);
+        Iterator<N> recoverItems() {
+            // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
+            // get started, hence this is safe.
+            return queue.iterator();
+        }
+
+        int size() {
+            lock.lock();
+            try {
+                return queue.size();
+            } finally {
+                lock.unlock();
             }
         }
 
-        @GuardedBy("queuingLock")
-        private void publishNotification(final N notification) throws InterruptedException {
-            // The offer is attempted for up to 10 minutes, with a status message printed each minute
-            for (int notificationOfferAttempts = 0;
-                 notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
+        boolean submitNotifications(final Iterator<N> notifications) throws InterruptedException {
+            final long start = System.nanoTime();
+            final long deadline = start + GIVE_UP_NANOS;
 
-                // Try to offer for up to a minute and log a message if it times out.
-                LOG.debug("{}: Offering notification to the queue for listener {}: {}", name, listenerKey,
-                    notification);
+            lock.lock();
+            try {
+                // Lock may have blocked for some time, we need to take that into account. We may have exceedded
+                // the deadline, but that is unlikely and even in that case we can make some progress without further
+                // blocking.
+                long canWait = deadline - System.nanoTime();
 
-                if (notificationQueue.offer(notification, 1, TimeUnit.MINUTES)) {
-                    return;
-                }
+                while (true) {
+                    // Check the exiting flag - if true then #run is in the process of exiting so return
+                    // false to indicate such. Otherwise, offer the notifications to the queue.
+                    if (exiting) {
+                        return false;
+                    }
 
-                LOG.warn("{}: Timed out trying to offer a notification to the queue for listener {} "
-                        + "on attempt {} of {}. The queue has reached its capacity of {}", name, listenerKey,
-                        notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity);
-            }
+                    final int avail = maxQueueCapacity - queue.size();
+                    if (avail <= 0) {
+                        if (canWait <= 0) {
+                            LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
+                                + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
+                                + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
+                                listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
+                            return true;
+                        }
 
-            LOG.warn("{}: Failed to offer a notification to the queue for listener {}. Exceeded max allowable attempts"
-                    + " of {} in {} minutes; the listener is likely in an unrecoverable state (deadlock or endless"
-                    + " loop).", name, listenerKey, MAX_NOTIFICATION_OFFER_ATTEMPTS, MAX_NOTIFICATION_OFFER_ATTEMPTS);
-        }
+                        canWait = notFull.awaitNanos(canWait);
+                        continue;
+                    }
 
-        boolean submitNotifications(final Iterable<N> notifications) throws InterruptedException {
+                    for (int i = 0; i < avail; ++i) {
+                        if (!notifications.hasNext()) {
+                            notEmpty.signal();
+                            return true;
+                        }
 
-            queuingLock.lock();
-            try {
+                        queue.offer(notifications.next());
+                    }
+                }
+            } finally {
+                lock.unlock();
+            }
+        }
 
-                // Check the done flag - if true then #run is in the process of exiting so return
-                // false to indicate such. Otherwise, offer the notifications to the queue.
+        @GuardedBy("lock")
+        private boolean waitForQueue() {
+            long timeout = TASK_WAIT_NANOS;
 
-                if (done) {
+            while (queue.isEmpty()) {
+                if (timeout <= 0) {
                     return false;
                 }
 
-                for (N notification : notifications) {
-                    publishNotification(notification);
+                try {
+                    timeout = notEmpty.awaitNanos(timeout);
+                } catch (InterruptedException e) {
+                    // The executor is probably shutting down so log as debug.
+                    LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
+                    return false;
                 }
-
-                // Set the queuedNotifications flag to tell #run that we've just queued
-                // notifications and not to exit yet, even if it thinks the queue is empty at this
-                // point.
-
-                queuedNotifications = true;
-            } finally {
-                queuingLock.unlock();
             }
 
             return true;
@@ -343,48 +398,27 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
         public void run() {
             try {
                 // Loop until we've dispatched all the notifications in the queue.
-
                 while (true) {
-                    // Get the notification at the head of the queue, waiting a little bit for one
-                    // to get offered.
-
-                    final N notification = notificationQueue.poll(10, TimeUnit.MILLISECONDS);
-                    if (notification == null) {
+                    final Collection<N> notifications;
 
-                        // The queue is empty - try to get the queuingLock. If we can't get the lock
-                        // then #submitNotifications is in the process of offering to the queue so
-                        // we'll loop back up and poll the queue again.
-
-                        if (queuingLock.tryLock()) {
-                            try {
-
-                                // Check the queuedNotifications flag to see if #submitNotifications
-                                // has offered new notification(s) to the queue. If so, loop back up
-                                // and poll the queue again. Otherwise set done to true and exit.
-                                // Once we set the done flag and unlock, calls to
-                                // #submitNotifications will fail and a new task will be created.
-
-                                if (!queuedNotifications) {
-                                    done = true;
-                                    break;
-                                }
-
-                                // Clear the queuedNotifications flag so we'll try to exit the next
-                                // time through the loop when the queue is empty.
+                    lock.lock();
+                    try {
+                        if (!waitForQueue()) {
+                            exiting = true;
+                            break;
+                        }
 
-                                queuedNotifications = false;
+                        // Splice the entire queue
+                        notifications = ImmutableList.copyOf(queue);
+                        queue.clear();
 
-                            } finally {
-                                queuingLock.unlock();
-                            }
-                        }
+                        notFull.signalAll();
+                    } finally {
+                        lock.unlock();
                     }
 
-                    notifyListener(notification);
+                    invokeListener(notifications);
                 }
-            } catch (InterruptedException e) {
-                // The executor is probably shutting down so log as debug.
-                LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
             } finally {
                 // We're exiting, gracefully or not - either way make sure we always remove
                 // ourselves from the cache.
@@ -392,17 +426,13 @@ public class QueuedNotificationManager<L, N> implements NotificationManager<L, N
             }
         }
 
-        private void notifyListener(final N notification) {
-            if (notification == null) {
-                return;
-            }
-
-            LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notification);
+        private void invokeListener(final Collection<N> notifications) {
+            LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
             try {
-                listenerInvoker.invokeListener(listenerKey.getListener(), ImmutableList.of(notification));
+                listenerInvoker.invokeListener(listenerKey.getListener(), notifications);
             } catch (Exception e) {
                 // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
-                LOG.error(String.format("%1$s: Error notifying listener %2$s", name, listenerKey), e);
+                LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
             }
         }
     }
index 3833b9387365ff23370d72d66b5a1ae073bfaac9..f3275802c5869f0270507a936b473bf9e7477a87 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.yangtools.util.concurrent;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -25,6 +26,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Unit tests for QueuedNotificationManager.
@@ -44,27 +47,27 @@ public class QueuedNotificationManagerTest {
         boolean cacheNotifications = true;
         String name;
 
-        TestListener( int expCount, int id ) {
+        TestListener(final int expCount, final int id) {
             name = "TestListener " + id;
-            actual = Collections.synchronizedList( Lists.newArrayListWithCapacity( expCount ) );
-            reset( expCount );
+            actual = Collections.synchronizedList(new ArrayList<>(expCount));
+            reset(expCount);
         }
 
-        void reset( int expCount ) {
+        void reset(final int expCount) {
             this.expCount = expCount;
-            latch = new CountDownLatch( expCount );
+            latch = new CountDownLatch(expCount);
             actual.clear();
         }
 
-        void onNotification( N data ) {
+        void onNotification(final N data) {
 
             try {
                 if (sleepTime > 0) {
-                    Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS );
+                    Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS);
                 }
 
                 if (cacheNotifications) {
-                    actual.add( data );
+                    actual.add(data);
                 }
 
                 RuntimeException localRuntimeEx = runtimeEx;
@@ -85,53 +88,52 @@ public class QueuedNotificationManagerTest {
         }
 
         void verifyNotifications() {
-            boolean done = Uninterruptibles.awaitUninterruptibly( latch, 10, TimeUnit.SECONDS );
+            boolean done = Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
             if (!done) {
                 long actualCount = latch.getCount();
-                fail( name + ": Received " + (expCount - actualCount) +
-                      " notifications. Expected " + expCount );
+                fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
             }
         }
 
-        void verifyNotifications( List<N> expected ) {
+        void verifyNotifications(final List<N> expected) {
             verifyNotifications();
-            assertEquals( name + ": Notifications", Lists.newArrayList( expected ), actual );
+            assertEquals(name + ": Notifications", expected, actual);
         }
 
         // Implement bad hashCode/equals methods to verify it doesn't screw up the
         // QueuedNotificationManager as it should use reference identity.
         @Override
-        public int hashCode(){
+        public int hashCode() {
             return 1;
         }
 
         @Override
-        public boolean equals( Object obj ){
+        public boolean equals(final Object obj) {
             TestListener<?> other = (TestListener<?>) obj;
             return other != null;
         }
     }
 
     static class TestListener2<N> extends TestListener<N> {
-        TestListener2( int expCount, int id  ) {
+        TestListener2(final int expCount, final int id) {
             super(expCount, id);
         }
     }
 
     static class TestListener3<N> extends TestListener<N> {
-        TestListener3( int expCount, int id ) {
+        TestListener3(final int expCount, final int id) {
             super(expCount, id);
         }
     }
 
-    static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>,N> {
-
+    static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>, N> {
         @Override
-        public void invokeListener( TestListener<N> listener, N notification ) {
-            listener.onNotification( notification );
+        public void invokeListener(final TestListener<N> listener, final N notification) {
+            listener.onNotification(notification);
         }
     }
 
+    private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class);
     private ExecutorService queueExecutor;
 
     @After
@@ -145,90 +147,84 @@ public class QueuedNotificationManagerTest {
     public void testNotificationsWithSingleListener() {
 
         queueExecutor = Executors.newFixedThreadPool( 2 );
-        NotificationManager<TestListener<Integer>, Integer> manager =
-                new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
-                10, "TestMgr" );
+        NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
+                new TestNotifier<>(), 10, "TestMgr" );
 
         int initialCount = 6;
         int nNotifications = 100;
 
-        TestListener<Integer> listener = new TestListener<>( nNotifications, 1 );
+        TestListener<Integer> listener = new TestListener<>(nNotifications, 1);
         listener.sleepTime = 20;
 
-        manager.submitNotifications( listener, Arrays.asList( 1, 2 ) );
-        manager.submitNotification( listener, 3 );
-        manager.submitNotifications( listener, Arrays.asList( 4, 5 ) );
-        manager.submitNotification( listener, 6 );
+        manager.submitNotifications(listener, Arrays.asList(1, 2));
+        manager.submitNotification(listener, 3);
+        manager.submitNotifications(listener, Arrays.asList(4, 5));
+        manager.submitNotification(listener, 6);
 
-        manager.submitNotifications( null, Collections.emptyList() );
-        manager.submitNotifications( listener, null );
-        manager.submitNotification( listener, null );
+        manager.submitNotifications(null, Collections.emptyList());
+        manager.submitNotifications(listener, null);
+        manager.submitNotification(listener, null);
 
-        Uninterruptibles.sleepUninterruptibly( 100, TimeUnit.MILLISECONDS );
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
         listener.sleepTime = 0;
 
-        List<Integer> expNotifications = Lists.newArrayListWithCapacity( nNotifications );
-        expNotifications.addAll( Arrays.asList( 1, 2, 3, 4, 5, 6 ) );
+        List<Integer> expNotifications = new ArrayList<>(nNotifications);
+        expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
         for (int i = 1; i <= nNotifications - initialCount; i++) {
-            Integer v = Integer.valueOf( initialCount + i );
-            expNotifications.add( v );
-            manager.submitNotification( listener, v );
+            Integer v = Integer.valueOf(initialCount + i);
+            expNotifications.add(v);
+            manager.submitNotification(listener, v);
         }
 
         listener.verifyNotifications( expNotifications );
     }
 
     @Test
-    public void testNotificationsWithMultipleListeners() {
+    public void testNotificationsWithMultipleListeners() throws InterruptedException {
 
         int nListeners = 10;
-        queueExecutor = Executors.newFixedThreadPool( nListeners );
-        final ExecutorService stagingExecutor = Executors.newFixedThreadPool( nListeners );
-        final NotificationManager<TestListener<Integer>, Integer> manager =
-                new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
-                5000, "TestMgr" );
+        queueExecutor = Executors.newFixedThreadPool(nListeners);
+        final ExecutorService stagingExecutor = Executors.newFixedThreadPool(nListeners);
+        final NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(
+                queueExecutor, new TestNotifier<>(), 5000, "TestMgr" );
 
         final int nNotifications = 100000;
 
-        System.out.println( "Testing " + nListeners + " listeners with " + nNotifications +
-                            " notifications each..." );
+        LOG.info("Testing {} listeners with {} notifications each...",  nListeners, nNotifications);
 
         final Integer[] notifications = new Integer[nNotifications];
         for (int i = 1; i <= nNotifications; i++) {
-            notifications[i-1] = Integer.valueOf( i );
+            notifications[i - 1] = Integer.valueOf(i);
         }
 
         Stopwatch stopWatch = Stopwatch.createStarted();
 
-        List<TestListener<Integer>> listeners = Lists.newArrayList();
+        List<TestListener<Integer>> listeners = new ArrayList<>();
+        List<Thread> threads = new ArrayList<>();
         for (int i = 1; i <= nListeners; i++) {
             final TestListener<Integer> listener =
                     i == 2 ? new TestListener2<>(nNotifications, i) :
                     i == 3 ? new TestListener3<>(nNotifications, i) :
                             new TestListener<>(nNotifications, i);
-            listeners.add( listener );
-
-            new Thread( new Runnable() {
-                @Override
-                public void run() {
-                    for (int j = 1; j <= nNotifications; j++) {
-                        final Integer n = notifications[j-1];
-                        stagingExecutor.execute( new Runnable() {
-                            @Override
-                            public void run() {
-                                manager.submitNotification( listener, n );
-                            }
-                        } );
-                    }
+            listeners.add(listener);
+
+            final Thread t = new Thread(() -> {
+                for (int j = 1; j <= nNotifications; j++) {
+                    final Integer n = notifications[j - 1];
+                    stagingExecutor.execute(() -> manager.submitNotification(listener, n));
                 }
-            } ).start();
+            });
+
+            t.start();
+            threads.add(t);
+
         }
 
         try {
             for (TestListener<Integer> listener: listeners) {
                 listener.verifyNotifications();
-                System.out.println( listener.name + " succeeded" );
+                LOG.info("{} succeeded", listener.name);
             }
         } finally {
             stagingExecutor.shutdownNow();
@@ -236,24 +232,28 @@ public class QueuedNotificationManagerTest {
 
         stopWatch.stop();
 
-        System.out.println( "Elapsed time: " + stopWatch );
-        System.out.println( queueExecutor );
+        LOG.info("Elapsed time: {}", stopWatch);
+        LOG.info("Executor: {}", queueExecutor);
+
+        for (Thread t : threads) {
+            t.join();
+        }
     }
 
     @Test(timeout=10000)
     public void testNotificationsWithListenerRuntimeEx() {
 
-        queueExecutor = Executors.newFixedThreadPool( 1 );
+        queueExecutor = Executors.newFixedThreadPool(1);
         NotificationManager<TestListener<Integer>, Integer> manager =
                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
                 10, "TestMgr" );
 
 
-        TestListener<Integer> listener = new TestListener<>( 2, 1 );
-        listener.runtimeEx = new RuntimeException( "mock" );
+        TestListener<Integer> listener = new TestListener<>(2, 1);
+        listener.runtimeEx = mock(RuntimeException.class);
 
-        manager.submitNotification( listener, 1 );
-        manager.submitNotification( listener, 2 );
+        manager.submitNotification(listener, 1);
+        manager.submitNotification(listener, 2);
 
         listener.verifyNotifications();
     }
@@ -261,37 +261,32 @@ public class QueuedNotificationManagerTest {
     @Test(timeout=10000)
     public void testNotificationsWithListenerJVMError() {
 
-        final CountDownLatch errorCaughtLatch = new CountDownLatch( 1 );
-        queueExecutor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>() ) {
+        final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
+        queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
              @Override
-             public void execute( final Runnable command ) {
-                 super.execute( new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            command.run();
-                        } catch( Error e ) {
-                            errorCaughtLatch.countDown();
-                        }
-                    }
-                });
+             public void execute(final Runnable command) {
+                 super.execute(() -> {
+                     try {
+                         command.run();
+                     } catch (Error e) {
+                         errorCaughtLatch.countDown();
+                     }
+                 });
              }
         };
 
-        NotificationManager<TestListener<Integer>, Integer> manager =
-                new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
-                10, "TestMgr" );
+        NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
+                new TestNotifier<>(), 10, "TestMgr");
 
-        TestListener<Integer> listener = new TestListener<>( 2, 1 );
-        listener.jvmError = new Error( "mock" );
+        TestListener<Integer> listener = new TestListener<>(2, 1);
+        listener.jvmError = mock(Error.class);
 
-        manager.submitNotification( listener, 1 );
+        manager.submitNotification(listener, 1);
 
-        assertEquals( "JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
-                                                       errorCaughtLatch, 5, TimeUnit.SECONDS ) );
+        assertEquals("JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
+                                                       errorCaughtLatch, 5, TimeUnit.SECONDS));
 
-        manager.submitNotification( listener, 2 );
+        manager.submitNotification(listener, 2);
 
         listener.verifyNotifications();
     }