From 0ab9979b537056119507bbed776f1b5ff668844b Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 21 Sep 2016 17:29:57 +0200 Subject: [PATCH] Allow QueuedNotificationManager to batch notifications This patch reworks the queueing logic so that notifications get completely read from the queue and then propagated to the listener invoker in one go. Change-Id: I1a4caaa75f7a206abf4ff5b6829918bd25d79551 Signed-off-by: Robert Varga (cherry picked from commit ddc17ff5daf9a58c5cf5f4ede3e49621c0db3d6c) --- .../concurrent/QueuedNotificationManager.java | 292 ++++++++++-------- .../QueuedNotificationManagerTest.java | 185 ++++++----- 2 files changed, 251 insertions(+), 226 deletions(-) diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java index 21a63ca253..5160efe9bf 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManager.java @@ -10,16 +10,18 @@ package org.opendaylight.yangtools.util.concurrent; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import java.util.ArrayDeque; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; -import java.util.concurrent.BlockingQueue; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -86,7 +88,9 @@ public class QueuedNotificationManager implements NotificationManager, NotificationTask> listenerCache = new ConcurrentHashMap<>(); private final BatchedInvoker listenerInvoker; @@ -116,8 +120,14 @@ public class QueuedNotificationManager implements NotificationManager listenerInvoker, final int maxQueueCapacity, final String name) { - this(executor, (BatchedInvoker)(l, c) -> c.forEach(n -> listenerInvoker.invokeListener(l, n)), - maxQueueCapacity, name); + this(executor, (BatchedInvoker)(l, c) -> c.forEach(n -> { + try { + listenerInvoker.invokeListener(l, n); + } catch (Exception e) { + LOG.error("{}: Error notifying listener {} with {}", name, l, n, e); + } + + }), maxQueueCapacity, name); Preconditions.checkNotNull(listenerInvoker); } @@ -134,6 +144,20 @@ public class QueuedNotificationManager implements NotificationManager(executor, listenerInvoker, maxQueueCapacity, name); } + /** + * Returns the maximum listener queue capacity. + */ + public int getMaxQueueCapacity() { + return maxQueueCapacity; + } + + /** + * Returns the {@link Executor} to used for notification tasks. + */ + public Executor getExecutor() { + return executor; + } + /* (non-Javadoc) * @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N) */ @@ -163,38 +187,47 @@ public class QueuedNotificationManager implements NotificationManager it = notifications.iterator(); while (true) { - final NotificationTask existingTask = listenerCache.get(key); - if (existingTask != null && existingTask.submitNotifications(notifications)) { - // We were able to add our notifications to an existing task so we're done. - break; + NotificationTask task = listenerCache.get(key); + if (task == null) { + // No task found, try to insert a new one + final NotificationTask newTask = new NotificationTask(key, it); + task = listenerCache.putIfAbsent(key, newTask); + if (task == null) { + // We were able to put our new task - now submit it to the executor and + // we're done. If it throws a RejectedExecutionException, let that propagate + // to the caller. + runTask(listener, newTask); + break; + } + + // We have a racing task, hence we can continue, but we need to refresh our iterator from + // the task. + it = newTask.recoverItems(); } - // Either there's no existing task or we couldn't add our notifications to the - // existing one because it's in the process of exiting and removing itself from - // the cache. Either way try to put a new task in the cache. If we can't put - // then either the existing one is still there and hasn't removed itself quite - // yet or some other concurrent thread beat us to the put although this method - // shouldn't be called concurrently for the same listener as that would violate - // notification ordering. In any case loop back up and try again. + final boolean completed = task.submitNotifications(it); + if (!completed) { + // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather + // than spinning on removal, we try to replace it. + final NotificationTask newTask = new NotificationTask(key, it); + if (listenerCache.replace(key, task, newTask)) { + runTask(listener, newTask); + break; + } - if (newNotificationTask == null) { - newNotificationTask = new NotificationTask(key, notifications); - } - final NotificationTask oldTask = listenerCache.putIfAbsent(key, newNotificationTask); - if (oldTask == null) { - // We were able to put our new task - now submit it to the executor and - // we're done. If it throws a RejectedxecutionException, let that propagate - // to the caller. - - LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener); - executor.execute(newNotificationTask); - break; + // We failed to replace the task, hence we need retry. Note we have to recover the items to be + // published from the new task. + it = newTask.recoverItems(); + LOG.debug("{}: retrying task queueing for {}", name, listener); + continue; } - LOG.debug("{}: retrying task queueing for {}", name, listener); + // All notifications have either been delivered or we have timed out and warned about the ones we + // have failed to deliver. In any case we are done here. + break; } } catch (InterruptedException e) { // We were interrupted trying to offer to the listener's queue. Somebody's probably @@ -202,7 +235,7 @@ public class QueuedNotificationManager implements NotificationManager implements NotificationManager getListenerNotificationQueueStats() { return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(), - t.notificationQueue.size())).collect(Collectors.toList()); + t.size())).collect(Collectors.toList()); } - /** - * Returns the maximum listener queue capacity. - */ - public int getMaxQueueCapacity() { - return maxQueueCapacity; - } - - /** - * Returns the {@link Executor} to used for notification tasks. - */ - public Executor getExecutor() { - return executor; + private void runTask(final L listener, final NotificationTask task) { + LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener); + executor.execute(task); } /** @@ -270,70 +294,101 @@ public class QueuedNotificationManager implements NotificationManager notificationQueue; + + private final Lock lock = new ReentrantLock(); + private final Condition notEmpty = lock.newCondition(); + private final Condition notFull = lock.newCondition(); private final ListenerKey listenerKey; - @GuardedBy("queuingLock") - private boolean queuedNotifications = false; - private volatile boolean done = false; + @GuardedBy("lock") + private final Queue queue = new ArrayDeque<>(); + @GuardedBy("lock") + private boolean exiting; - NotificationTask(final ListenerKey listenerKey, final Iterable notifications) { + NotificationTask(final ListenerKey listenerKey, final Iterator notifications) { this.listenerKey = Preconditions.checkNotNull(listenerKey); - this.notificationQueue = new LinkedBlockingQueue<>(maxQueueCapacity); + while (notifications.hasNext()) { + queue.offer(notifications.next()); + } + } - for (N notification: notifications) { - this.notificationQueue.add(notification); + Iterator recoverItems() { + // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never + // get started, hence this is safe. + return queue.iterator(); + } + + int size() { + lock.lock(); + try { + return queue.size(); + } finally { + lock.unlock(); } } - @GuardedBy("queuingLock") - private void publishNotification(final N notification) throws InterruptedException { - // The offer is attempted for up to 10 minutes, with a status message printed each minute - for (int notificationOfferAttempts = 0; - notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) { + boolean submitNotifications(final Iterator notifications) throws InterruptedException { + final long start = System.nanoTime(); + final long deadline = start + GIVE_UP_NANOS; - // Try to offer for up to a minute and log a message if it times out. - LOG.debug("{}: Offering notification to the queue for listener {}: {}", name, listenerKey, - notification); + lock.lock(); + try { + // Lock may have blocked for some time, we need to take that into account. We may have exceedded + // the deadline, but that is unlikely and even in that case we can make some progress without further + // blocking. + long canWait = deadline - System.nanoTime(); - if (notificationQueue.offer(notification, 1, TimeUnit.MINUTES)) { - return; - } + while (true) { + // Check the exiting flag - if true then #run is in the process of exiting so return + // false to indicate such. Otherwise, offer the notifications to the queue. + if (exiting) { + return false; + } - LOG.warn("{}: Timed out trying to offer a notification to the queue for listener {} " - + "on attempt {} of {}. The queue has reached its capacity of {}", name, listenerKey, - notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity); - } + final int avail = maxQueueCapacity - queue.size(); + if (avail <= 0) { + if (canWait <= 0) { + LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded" + + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable" + + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications), + listenerKey, MAX_NOTIFICATION_OFFER_MINUTES); + return true; + } - LOG.warn("{}: Failed to offer a notification to the queue for listener {}. Exceeded max allowable attempts" - + " of {} in {} minutes; the listener is likely in an unrecoverable state (deadlock or endless" - + " loop).", name, listenerKey, MAX_NOTIFICATION_OFFER_ATTEMPTS, MAX_NOTIFICATION_OFFER_ATTEMPTS); - } + canWait = notFull.awaitNanos(canWait); + continue; + } - boolean submitNotifications(final Iterable notifications) throws InterruptedException { + for (int i = 0; i < avail; ++i) { + if (!notifications.hasNext()) { + notEmpty.signal(); + return true; + } - queuingLock.lock(); - try { + queue.offer(notifications.next()); + } + } + } finally { + lock.unlock(); + } + } - // Check the done flag - if true then #run is in the process of exiting so return - // false to indicate such. Otherwise, offer the notifications to the queue. + @GuardedBy("lock") + private boolean waitForQueue() { + long timeout = TASK_WAIT_NANOS; - if (done) { + while (queue.isEmpty()) { + if (timeout <= 0) { return false; } - for (N notification : notifications) { - publishNotification(notification); + try { + timeout = notEmpty.awaitNanos(timeout); + } catch (InterruptedException e) { + // The executor is probably shutting down so log as debug. + LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey); + return false; } - - // Set the queuedNotifications flag to tell #run that we've just queued - // notifications and not to exit yet, even if it thinks the queue is empty at this - // point. - - queuedNotifications = true; - } finally { - queuingLock.unlock(); } return true; @@ -343,48 +398,27 @@ public class QueuedNotificationManager implements NotificationManager notifications; - // The queue is empty - try to get the queuingLock. If we can't get the lock - // then #submitNotifications is in the process of offering to the queue so - // we'll loop back up and poll the queue again. - - if (queuingLock.tryLock()) { - try { - - // Check the queuedNotifications flag to see if #submitNotifications - // has offered new notification(s) to the queue. If so, loop back up - // and poll the queue again. Otherwise set done to true and exit. - // Once we set the done flag and unlock, calls to - // #submitNotifications will fail and a new task will be created. - - if (!queuedNotifications) { - done = true; - break; - } - - // Clear the queuedNotifications flag so we'll try to exit the next - // time through the loop when the queue is empty. + lock.lock(); + try { + if (!waitForQueue()) { + exiting = true; + break; + } - queuedNotifications = false; + // Splice the entire queue + notifications = ImmutableList.copyOf(queue); + queue.clear(); - } finally { - queuingLock.unlock(); - } - } + notFull.signalAll(); + } finally { + lock.unlock(); } - notifyListener(notification); + invokeListener(notifications); } - } catch (InterruptedException e) { - // The executor is probably shutting down so log as debug. - LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey); } finally { // We're exiting, gracefully or not - either way make sure we always remove // ourselves from the cache. @@ -392,17 +426,13 @@ public class QueuedNotificationManager implements NotificationManager notifications) { + LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications); try { - listenerInvoker.invokeListener(listenerKey.getListener(), ImmutableList.of(notification)); + listenerInvoker.invokeListener(listenerKey.getListener(), notifications); } catch (Exception e) { // We'll let a RuntimeException from the listener slide and keep sending any remaining notifications. - LOG.error(String.format("%1$s: Error notifying listener %2$s", name, listenerKey), e); + LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e); } } } diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java index 3833b93873..f3275802c5 100644 --- a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java @@ -10,10 +10,11 @@ package org.opendaylight.yangtools.util.concurrent; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import com.google.common.base.Stopwatch; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -25,6 +26,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Unit tests for QueuedNotificationManager. @@ -44,27 +47,27 @@ public class QueuedNotificationManagerTest { boolean cacheNotifications = true; String name; - TestListener( int expCount, int id ) { + TestListener(final int expCount, final int id) { name = "TestListener " + id; - actual = Collections.synchronizedList( Lists.newArrayListWithCapacity( expCount ) ); - reset( expCount ); + actual = Collections.synchronizedList(new ArrayList<>(expCount)); + reset(expCount); } - void reset( int expCount ) { + void reset(final int expCount) { this.expCount = expCount; - latch = new CountDownLatch( expCount ); + latch = new CountDownLatch(expCount); actual.clear(); } - void onNotification( N data ) { + void onNotification(final N data) { try { if (sleepTime > 0) { - Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS ); + Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS); } if (cacheNotifications) { - actual.add( data ); + actual.add(data); } RuntimeException localRuntimeEx = runtimeEx; @@ -85,53 +88,52 @@ public class QueuedNotificationManagerTest { } void verifyNotifications() { - boolean done = Uninterruptibles.awaitUninterruptibly( latch, 10, TimeUnit.SECONDS ); + boolean done = Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS); if (!done) { long actualCount = latch.getCount(); - fail( name + ": Received " + (expCount - actualCount) + - " notifications. Expected " + expCount ); + fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount); } } - void verifyNotifications( List expected ) { + void verifyNotifications(final List expected) { verifyNotifications(); - assertEquals( name + ": Notifications", Lists.newArrayList( expected ), actual ); + assertEquals(name + ": Notifications", expected, actual); } // Implement bad hashCode/equals methods to verify it doesn't screw up the // QueuedNotificationManager as it should use reference identity. @Override - public int hashCode(){ + public int hashCode() { return 1; } @Override - public boolean equals( Object obj ){ + public boolean equals(final Object obj) { TestListener other = (TestListener) obj; return other != null; } } static class TestListener2 extends TestListener { - TestListener2( int expCount, int id ) { + TestListener2(final int expCount, final int id) { super(expCount, id); } } static class TestListener3 extends TestListener { - TestListener3( int expCount, int id ) { + TestListener3(final int expCount, final int id) { super(expCount, id); } } - static class TestNotifier implements QueuedNotificationManager.Invoker,N> { - + static class TestNotifier implements QueuedNotificationManager.Invoker, N> { @Override - public void invokeListener( TestListener listener, N notification ) { - listener.onNotification( notification ); + public void invokeListener(final TestListener listener, final N notification) { + listener.onNotification(notification); } } + private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class); private ExecutorService queueExecutor; @After @@ -145,90 +147,84 @@ public class QueuedNotificationManagerTest { public void testNotificationsWithSingleListener() { queueExecutor = Executors.newFixedThreadPool( 2 ); - NotificationManager, Integer> manager = - new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(), - 10, "TestMgr" ); + NotificationManager, Integer> manager = new QueuedNotificationManager<>(queueExecutor, + new TestNotifier<>(), 10, "TestMgr" ); int initialCount = 6; int nNotifications = 100; - TestListener listener = new TestListener<>( nNotifications, 1 ); + TestListener listener = new TestListener<>(nNotifications, 1); listener.sleepTime = 20; - manager.submitNotifications( listener, Arrays.asList( 1, 2 ) ); - manager.submitNotification( listener, 3 ); - manager.submitNotifications( listener, Arrays.asList( 4, 5 ) ); - manager.submitNotification( listener, 6 ); + manager.submitNotifications(listener, Arrays.asList(1, 2)); + manager.submitNotification(listener, 3); + manager.submitNotifications(listener, Arrays.asList(4, 5)); + manager.submitNotification(listener, 6); - manager.submitNotifications( null, Collections.emptyList() ); - manager.submitNotifications( listener, null ); - manager.submitNotification( listener, null ); + manager.submitNotifications(null, Collections.emptyList()); + manager.submitNotifications(listener, null); + manager.submitNotification(listener, null); - Uninterruptibles.sleepUninterruptibly( 100, TimeUnit.MILLISECONDS ); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); listener.sleepTime = 0; - List expNotifications = Lists.newArrayListWithCapacity( nNotifications ); - expNotifications.addAll( Arrays.asList( 1, 2, 3, 4, 5, 6 ) ); + List expNotifications = new ArrayList<>(nNotifications); + expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6)); for (int i = 1; i <= nNotifications - initialCount; i++) { - Integer v = Integer.valueOf( initialCount + i ); - expNotifications.add( v ); - manager.submitNotification( listener, v ); + Integer v = Integer.valueOf(initialCount + i); + expNotifications.add(v); + manager.submitNotification(listener, v); } listener.verifyNotifications( expNotifications ); } @Test - public void testNotificationsWithMultipleListeners() { + public void testNotificationsWithMultipleListeners() throws InterruptedException { int nListeners = 10; - queueExecutor = Executors.newFixedThreadPool( nListeners ); - final ExecutorService stagingExecutor = Executors.newFixedThreadPool( nListeners ); - final NotificationManager, Integer> manager = - new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(), - 5000, "TestMgr" ); + queueExecutor = Executors.newFixedThreadPool(nListeners); + final ExecutorService stagingExecutor = Executors.newFixedThreadPool(nListeners); + final NotificationManager, Integer> manager = new QueuedNotificationManager<>( + queueExecutor, new TestNotifier<>(), 5000, "TestMgr" ); final int nNotifications = 100000; - System.out.println( "Testing " + nListeners + " listeners with " + nNotifications + - " notifications each..." ); + LOG.info("Testing {} listeners with {} notifications each...", nListeners, nNotifications); final Integer[] notifications = new Integer[nNotifications]; for (int i = 1; i <= nNotifications; i++) { - notifications[i-1] = Integer.valueOf( i ); + notifications[i - 1] = Integer.valueOf(i); } Stopwatch stopWatch = Stopwatch.createStarted(); - List> listeners = Lists.newArrayList(); + List> listeners = new ArrayList<>(); + List threads = new ArrayList<>(); for (int i = 1; i <= nListeners; i++) { final TestListener listener = i == 2 ? new TestListener2<>(nNotifications, i) : i == 3 ? new TestListener3<>(nNotifications, i) : new TestListener<>(nNotifications, i); - listeners.add( listener ); - - new Thread( new Runnable() { - @Override - public void run() { - for (int j = 1; j <= nNotifications; j++) { - final Integer n = notifications[j-1]; - stagingExecutor.execute( new Runnable() { - @Override - public void run() { - manager.submitNotification( listener, n ); - } - } ); - } + listeners.add(listener); + + final Thread t = new Thread(() -> { + for (int j = 1; j <= nNotifications; j++) { + final Integer n = notifications[j - 1]; + stagingExecutor.execute(() -> manager.submitNotification(listener, n)); } - } ).start(); + }); + + t.start(); + threads.add(t); + } try { for (TestListener listener: listeners) { listener.verifyNotifications(); - System.out.println( listener.name + " succeeded" ); + LOG.info("{} succeeded", listener.name); } } finally { stagingExecutor.shutdownNow(); @@ -236,24 +232,28 @@ public class QueuedNotificationManagerTest { stopWatch.stop(); - System.out.println( "Elapsed time: " + stopWatch ); - System.out.println( queueExecutor ); + LOG.info("Elapsed time: {}", stopWatch); + LOG.info("Executor: {}", queueExecutor); + + for (Thread t : threads) { + t.join(); + } } @Test(timeout=10000) public void testNotificationsWithListenerRuntimeEx() { - queueExecutor = Executors.newFixedThreadPool( 1 ); + queueExecutor = Executors.newFixedThreadPool(1); NotificationManager, Integer> manager = new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(), 10, "TestMgr" ); - TestListener listener = new TestListener<>( 2, 1 ); - listener.runtimeEx = new RuntimeException( "mock" ); + TestListener listener = new TestListener<>(2, 1); + listener.runtimeEx = mock(RuntimeException.class); - manager.submitNotification( listener, 1 ); - manager.submitNotification( listener, 2 ); + manager.submitNotification(listener, 1); + manager.submitNotification(listener, 2); listener.verifyNotifications(); } @@ -261,37 +261,32 @@ public class QueuedNotificationManagerTest { @Test(timeout=10000) public void testNotificationsWithListenerJVMError() { - final CountDownLatch errorCaughtLatch = new CountDownLatch( 1 ); - queueExecutor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS, - new LinkedBlockingQueue<>() ) { + final CountDownLatch errorCaughtLatch = new CountDownLatch(1); + queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) { @Override - public void execute( final Runnable command ) { - super.execute( new Runnable() { - @Override - public void run() { - try { - command.run(); - } catch( Error e ) { - errorCaughtLatch.countDown(); - } - } - }); + public void execute(final Runnable command) { + super.execute(() -> { + try { + command.run(); + } catch (Error e) { + errorCaughtLatch.countDown(); + } + }); } }; - NotificationManager, Integer> manager = - new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(), - 10, "TestMgr" ); + NotificationManager, Integer> manager = new QueuedNotificationManager<>(queueExecutor, + new TestNotifier<>(), 10, "TestMgr"); - TestListener listener = new TestListener<>( 2, 1 ); - listener.jvmError = new Error( "mock" ); + TestListener listener = new TestListener<>(2, 1); + listener.jvmError = mock(Error.class); - manager.submitNotification( listener, 1 ); + manager.submitNotification(listener, 1); - assertEquals( "JVM Error caught", true, Uninterruptibles.awaitUninterruptibly( - errorCaughtLatch, 5, TimeUnit.SECONDS ) ); + assertEquals("JVM Error caught", true, Uninterruptibles.awaitUninterruptibly( + errorCaughtLatch, 5, TimeUnit.SECONDS)); - manager.submitNotification( listener, 2 ); + manager.submitNotification(listener, 2); listener.verifyNotifications(); } -- 2.36.6