import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
+import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
* Caps the maximum number of attempts to offer notification to a particular listener. Each
* attempt window is 1 minute, so an offer times out after roughly 10 minutes.
*/
- private static final int MAX_NOTIFICATION_OFFER_ATTEMPTS = 10;
+ private static final int MAX_NOTIFICATION_OFFER_MINUTES = 10;
+ private static final long GIVE_UP_NANOS = TimeUnit.MINUTES.toNanos(MAX_NOTIFICATION_OFFER_MINUTES);
+ private static final long TASK_WAIT_NANOS = TimeUnit.MILLISECONDS.toNanos(10);
private final ConcurrentMap<ListenerKey<L>, NotificationTask> listenerCache = new ConcurrentHashMap<>();
private final BatchedInvoker<L, N> listenerInvoker;
@Deprecated
public QueuedNotificationManager(final Executor executor, final Invoker<L, N> listenerInvoker,
final int maxQueueCapacity, final String name) {
- this(executor, (BatchedInvoker<L, N>)(l, c) -> c.forEach(n -> listenerInvoker.invokeListener(l, n)),
- maxQueueCapacity, name);
+ this(executor, (BatchedInvoker<L, N>)(l, c) -> c.forEach(n -> {
+ try {
+ listenerInvoker.invokeListener(l, n);
+ } catch (Exception e) {
+ LOG.error("{}: Error notifying listener {} with {}", name, l, n, e);
+ }
+
+ }), maxQueueCapacity, name);
Preconditions.checkNotNull(listenerInvoker);
}
return new QueuedNotificationManager<>(executor, listenerInvoker, maxQueueCapacity, name);
}
+ /**
+ * Returns the maximum listener queue capacity.
+ */
+ public int getMaxQueueCapacity() {
+ return maxQueueCapacity;
+ }
+
+ /**
+ * Returns the {@link Executor} to used for notification tasks.
+ */
+ public Executor getExecutor() {
+ return executor;
+ }
+
/* (non-Javadoc)
* @see org.opendaylight.yangtools.util.concurrent.NotificationManager#addNotification(L, N)
*/
// add our notifications to an existing NotificationTask. Eventually one or the other
// will occur.
try {
- NotificationTask newNotificationTask = null;
+ Iterator<N> it = notifications.iterator();
while (true) {
- final NotificationTask existingTask = listenerCache.get(key);
- if (existingTask != null && existingTask.submitNotifications(notifications)) {
- // We were able to add our notifications to an existing task so we're done.
- break;
+ NotificationTask task = listenerCache.get(key);
+ if (task == null) {
+ // No task found, try to insert a new one
+ final NotificationTask newTask = new NotificationTask(key, it);
+ task = listenerCache.putIfAbsent(key, newTask);
+ if (task == null) {
+ // We were able to put our new task - now submit it to the executor and
+ // we're done. If it throws a RejectedExecutionException, let that propagate
+ // to the caller.
+ runTask(listener, newTask);
+ break;
+ }
+
+ // We have a racing task, hence we can continue, but we need to refresh our iterator from
+ // the task.
+ it = newTask.recoverItems();
}
- // Either there's no existing task or we couldn't add our notifications to the
- // existing one because it's in the process of exiting and removing itself from
- // the cache. Either way try to put a new task in the cache. If we can't put
- // then either the existing one is still there and hasn't removed itself quite
- // yet or some other concurrent thread beat us to the put although this method
- // shouldn't be called concurrently for the same listener as that would violate
- // notification ordering. In any case loop back up and try again.
+ final boolean completed = task.submitNotifications(it);
+ if (!completed) {
+ // Task is indicating it is exiting before it has consumed all the items and is exiting. Rather
+ // than spinning on removal, we try to replace it.
+ final NotificationTask newTask = new NotificationTask(key, it);
+ if (listenerCache.replace(key, task, newTask)) {
+ runTask(listener, newTask);
+ break;
+ }
- if (newNotificationTask == null) {
- newNotificationTask = new NotificationTask(key, notifications);
- }
- final NotificationTask oldTask = listenerCache.putIfAbsent(key, newNotificationTask);
- if (oldTask == null) {
- // We were able to put our new task - now submit it to the executor and
- // we're done. If it throws a RejectedxecutionException, let that propagate
- // to the caller.
-
- LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
- executor.execute(newNotificationTask);
- break;
+ // We failed to replace the task, hence we need retry. Note we have to recover the items to be
+ // published from the new task.
+ it = newTask.recoverItems();
+ LOG.debug("{}: retrying task queueing for {}", name, listener);
+ continue;
}
- LOG.debug("{}: retrying task queueing for {}", name, listener);
+ // All notifications have either been delivered or we have timed out and warned about the ones we
+ // have failed to deliver. In any case we are done here.
+ break;
}
} catch (InterruptedException e) {
// We were interrupted trying to offer to the listener's queue. Somebody's probably
LOG.warn("{}: Interrupted trying to add to {} listener's queue", name, listener);
}
- LOG.trace("{}: submitNotifications dine for listener {}", name, listener);
+ LOG.trace("{}: submitNotifications done for listener {}", name, listener);
}
/**
*/
public List<ListenerNotificationQueueStats> getListenerNotificationQueueStats() {
return listenerCache.values().stream().map(t -> new ListenerNotificationQueueStats(t.listenerKey.toString(),
- t.notificationQueue.size())).collect(Collectors.toList());
+ t.size())).collect(Collectors.toList());
}
- /**
- * Returns the maximum listener queue capacity.
- */
- public int getMaxQueueCapacity() {
- return maxQueueCapacity;
- }
-
- /**
- * Returns the {@link Executor} to used for notification tasks.
- */
- public Executor getExecutor() {
- return executor;
+ private void runTask(final L listener, final NotificationTask task) {
+ LOG.debug("{}: Submitting NotificationTask for listener {}", name, listener);
+ executor.execute(task);
}
/**
* listener.
*/
private class NotificationTask implements Runnable {
- private final Lock queuingLock = new ReentrantLock();
- private final BlockingQueue<N> notificationQueue;
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition notEmpty = lock.newCondition();
+ private final Condition notFull = lock.newCondition();
private final ListenerKey<L> listenerKey;
- @GuardedBy("queuingLock")
- private boolean queuedNotifications = false;
- private volatile boolean done = false;
+ @GuardedBy("lock")
+ private final Queue<N> queue = new ArrayDeque<>();
+ @GuardedBy("lock")
+ private boolean exiting;
- NotificationTask(final ListenerKey<L> listenerKey, final Iterable<N> notifications) {
+ NotificationTask(final ListenerKey<L> listenerKey, final Iterator<N> notifications) {
this.listenerKey = Preconditions.checkNotNull(listenerKey);
- this.notificationQueue = new LinkedBlockingQueue<>(maxQueueCapacity);
+ while (notifications.hasNext()) {
+ queue.offer(notifications.next());
+ }
+ }
- for (N notification: notifications) {
- this.notificationQueue.add(notification);
+ Iterator<N> recoverItems() {
+ // This violates @GuardedBy annotation, but is invoked only when the task is not started and will never
+ // get started, hence this is safe.
+ return queue.iterator();
+ }
+
+ int size() {
+ lock.lock();
+ try {
+ return queue.size();
+ } finally {
+ lock.unlock();
}
}
- @GuardedBy("queuingLock")
- private void publishNotification(final N notification) throws InterruptedException {
- // The offer is attempted for up to 10 minutes, with a status message printed each minute
- for (int notificationOfferAttempts = 0;
- notificationOfferAttempts < MAX_NOTIFICATION_OFFER_ATTEMPTS; notificationOfferAttempts++) {
+ boolean submitNotifications(final Iterator<N> notifications) throws InterruptedException {
+ final long start = System.nanoTime();
+ final long deadline = start + GIVE_UP_NANOS;
- // Try to offer for up to a minute and log a message if it times out.
- LOG.debug("{}: Offering notification to the queue for listener {}: {}", name, listenerKey,
- notification);
+ lock.lock();
+ try {
+ // Lock may have blocked for some time, we need to take that into account. We may have exceedded
+ // the deadline, but that is unlikely and even in that case we can make some progress without further
+ // blocking.
+ long canWait = deadline - System.nanoTime();
- if (notificationQueue.offer(notification, 1, TimeUnit.MINUTES)) {
- return;
- }
+ while (true) {
+ // Check the exiting flag - if true then #run is in the process of exiting so return
+ // false to indicate such. Otherwise, offer the notifications to the queue.
+ if (exiting) {
+ return false;
+ }
- LOG.warn("{}: Timed out trying to offer a notification to the queue for listener {} "
- + "on attempt {} of {}. The queue has reached its capacity of {}", name, listenerKey,
- notificationOfferAttempts, MAX_NOTIFICATION_OFFER_ATTEMPTS, maxQueueCapacity);
- }
+ final int avail = maxQueueCapacity - queue.size();
+ if (avail <= 0) {
+ if (canWait <= 0) {
+ LOG.warn("{}: Failed to offer notifications {} to the queue for listener {}. Exceeded"
+ + "maximum allowable time of {} minutes; the listener is likely in an unrecoverable"
+ + "state (deadlock or endless loop). ", name, ImmutableList.copyOf(notifications),
+ listenerKey, MAX_NOTIFICATION_OFFER_MINUTES);
+ return true;
+ }
- LOG.warn("{}: Failed to offer a notification to the queue for listener {}. Exceeded max allowable attempts"
- + " of {} in {} minutes; the listener is likely in an unrecoverable state (deadlock or endless"
- + " loop).", name, listenerKey, MAX_NOTIFICATION_OFFER_ATTEMPTS, MAX_NOTIFICATION_OFFER_ATTEMPTS);
- }
+ canWait = notFull.awaitNanos(canWait);
+ continue;
+ }
- boolean submitNotifications(final Iterable<N> notifications) throws InterruptedException {
+ for (int i = 0; i < avail; ++i) {
+ if (!notifications.hasNext()) {
+ notEmpty.signal();
+ return true;
+ }
- queuingLock.lock();
- try {
+ queue.offer(notifications.next());
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
- // Check the done flag - if true then #run is in the process of exiting so return
- // false to indicate such. Otherwise, offer the notifications to the queue.
+ @GuardedBy("lock")
+ private boolean waitForQueue() {
+ long timeout = TASK_WAIT_NANOS;
- if (done) {
+ while (queue.isEmpty()) {
+ if (timeout <= 0) {
return false;
}
- for (N notification : notifications) {
- publishNotification(notification);
+ try {
+ timeout = notEmpty.awaitNanos(timeout);
+ } catch (InterruptedException e) {
+ // The executor is probably shutting down so log as debug.
+ LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
+ return false;
}
-
- // Set the queuedNotifications flag to tell #run that we've just queued
- // notifications and not to exit yet, even if it thinks the queue is empty at this
- // point.
-
- queuedNotifications = true;
- } finally {
- queuingLock.unlock();
}
return true;
public void run() {
try {
// Loop until we've dispatched all the notifications in the queue.
-
while (true) {
- // Get the notification at the head of the queue, waiting a little bit for one
- // to get offered.
-
- final N notification = notificationQueue.poll(10, TimeUnit.MILLISECONDS);
- if (notification == null) {
+ final Collection<N> notifications;
- // The queue is empty - try to get the queuingLock. If we can't get the lock
- // then #submitNotifications is in the process of offering to the queue so
- // we'll loop back up and poll the queue again.
-
- if (queuingLock.tryLock()) {
- try {
-
- // Check the queuedNotifications flag to see if #submitNotifications
- // has offered new notification(s) to the queue. If so, loop back up
- // and poll the queue again. Otherwise set done to true and exit.
- // Once we set the done flag and unlock, calls to
- // #submitNotifications will fail and a new task will be created.
-
- if (!queuedNotifications) {
- done = true;
- break;
- }
-
- // Clear the queuedNotifications flag so we'll try to exit the next
- // time through the loop when the queue is empty.
+ lock.lock();
+ try {
+ if (!waitForQueue()) {
+ exiting = true;
+ break;
+ }
- queuedNotifications = false;
+ // Splice the entire queue
+ notifications = ImmutableList.copyOf(queue);
+ queue.clear();
- } finally {
- queuingLock.unlock();
- }
- }
+ notFull.signalAll();
+ } finally {
+ lock.unlock();
}
- notifyListener(notification);
+ invokeListener(notifications);
}
- } catch (InterruptedException e) {
- // The executor is probably shutting down so log as debug.
- LOG.debug("{}: Interrupted trying to remove from {} listener's queue", name, listenerKey);
} finally {
// We're exiting, gracefully or not - either way make sure we always remove
// ourselves from the cache.
}
}
- private void notifyListener(final N notification) {
- if (notification == null) {
- return;
- }
-
- LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notification);
+ private void invokeListener(final Collection<N> notifications) {
+ LOG.debug("{}: Invoking listener {} with notification: {}", name, listenerKey, notifications);
try {
- listenerInvoker.invokeListener(listenerKey.getListener(), ImmutableList.of(notification));
+ listenerInvoker.invokeListener(listenerKey.getListener(), notifications);
} catch (Exception e) {
// We'll let a RuntimeException from the listener slide and keep sending any remaining notifications.
- LOG.error(String.format("%1$s: Error notifying listener %2$s", name, listenerKey), e);
+ LOG.error("{}: Error notifying listener {} with {}", name, listenerKey, notifications, e);
}
}
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Unit tests for QueuedNotificationManager.
boolean cacheNotifications = true;
String name;
- TestListener( int expCount, int id ) {
+ TestListener(final int expCount, final int id) {
name = "TestListener " + id;
- actual = Collections.synchronizedList( Lists.newArrayListWithCapacity( expCount ) );
- reset( expCount );
+ actual = Collections.synchronizedList(new ArrayList<>(expCount));
+ reset(expCount);
}
- void reset( int expCount ) {
+ void reset(final int expCount) {
this.expCount = expCount;
- latch = new CountDownLatch( expCount );
+ latch = new CountDownLatch(expCount);
actual.clear();
}
- void onNotification( N data ) {
+ void onNotification(final N data) {
try {
if (sleepTime > 0) {
- Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS );
+ Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS);
}
if (cacheNotifications) {
- actual.add( data );
+ actual.add(data);
}
RuntimeException localRuntimeEx = runtimeEx;
}
void verifyNotifications() {
- boolean done = Uninterruptibles.awaitUninterruptibly( latch, 10, TimeUnit.SECONDS );
+ boolean done = Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
if (!done) {
long actualCount = latch.getCount();
- fail( name + ": Received " + (expCount - actualCount) +
- " notifications. Expected " + expCount );
+ fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
}
}
- void verifyNotifications( List<N> expected ) {
+ void verifyNotifications(final List<N> expected) {
verifyNotifications();
- assertEquals( name + ": Notifications", Lists.newArrayList( expected ), actual );
+ assertEquals(name + ": Notifications", expected, actual);
}
// Implement bad hashCode/equals methods to verify it doesn't screw up the
// QueuedNotificationManager as it should use reference identity.
@Override
- public int hashCode(){
+ public int hashCode() {
return 1;
}
@Override
- public boolean equals( Object obj ){
+ public boolean equals(final Object obj) {
TestListener<?> other = (TestListener<?>) obj;
return other != null;
}
}
static class TestListener2<N> extends TestListener<N> {
- TestListener2( int expCount, int id ) {
+ TestListener2(final int expCount, final int id) {
super(expCount, id);
}
}
static class TestListener3<N> extends TestListener<N> {
- TestListener3( int expCount, int id ) {
+ TestListener3(final int expCount, final int id) {
super(expCount, id);
}
}
- static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>,N> {
-
+ static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>, N> {
@Override
- public void invokeListener( TestListener<N> listener, N notification ) {
- listener.onNotification( notification );
+ public void invokeListener(final TestListener<N> listener, final N notification) {
+ listener.onNotification(notification);
}
}
+ private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class);
private ExecutorService queueExecutor;
@After
public void testNotificationsWithSingleListener() {
queueExecutor = Executors.newFixedThreadPool( 2 );
- NotificationManager<TestListener<Integer>, Integer> manager =
- new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
- 10, "TestMgr" );
+ NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
+ new TestNotifier<>(), 10, "TestMgr" );
int initialCount = 6;
int nNotifications = 100;
- TestListener<Integer> listener = new TestListener<>( nNotifications, 1 );
+ TestListener<Integer> listener = new TestListener<>(nNotifications, 1);
listener.sleepTime = 20;
- manager.submitNotifications( listener, Arrays.asList( 1, 2 ) );
- manager.submitNotification( listener, 3 );
- manager.submitNotifications( listener, Arrays.asList( 4, 5 ) );
- manager.submitNotification( listener, 6 );
+ manager.submitNotifications(listener, Arrays.asList(1, 2));
+ manager.submitNotification(listener, 3);
+ manager.submitNotifications(listener, Arrays.asList(4, 5));
+ manager.submitNotification(listener, 6);
- manager.submitNotifications( null, Collections.emptyList() );
- manager.submitNotifications( listener, null );
- manager.submitNotification( listener, null );
+ manager.submitNotifications(null, Collections.emptyList());
+ manager.submitNotifications(listener, null);
+ manager.submitNotification(listener, null);
- Uninterruptibles.sleepUninterruptibly( 100, TimeUnit.MILLISECONDS );
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
listener.sleepTime = 0;
- List<Integer> expNotifications = Lists.newArrayListWithCapacity( nNotifications );
- expNotifications.addAll( Arrays.asList( 1, 2, 3, 4, 5, 6 ) );
+ List<Integer> expNotifications = new ArrayList<>(nNotifications);
+ expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
for (int i = 1; i <= nNotifications - initialCount; i++) {
- Integer v = Integer.valueOf( initialCount + i );
- expNotifications.add( v );
- manager.submitNotification( listener, v );
+ Integer v = Integer.valueOf(initialCount + i);
+ expNotifications.add(v);
+ manager.submitNotification(listener, v);
}
listener.verifyNotifications( expNotifications );
}
@Test
- public void testNotificationsWithMultipleListeners() {
+ public void testNotificationsWithMultipleListeners() throws InterruptedException {
int nListeners = 10;
- queueExecutor = Executors.newFixedThreadPool( nListeners );
- final ExecutorService stagingExecutor = Executors.newFixedThreadPool( nListeners );
- final NotificationManager<TestListener<Integer>, Integer> manager =
- new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
- 5000, "TestMgr" );
+ queueExecutor = Executors.newFixedThreadPool(nListeners);
+ final ExecutorService stagingExecutor = Executors.newFixedThreadPool(nListeners);
+ final NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(
+ queueExecutor, new TestNotifier<>(), 5000, "TestMgr" );
final int nNotifications = 100000;
- System.out.println( "Testing " + nListeners + " listeners with " + nNotifications +
- " notifications each..." );
+ LOG.info("Testing {} listeners with {} notifications each...", nListeners, nNotifications);
final Integer[] notifications = new Integer[nNotifications];
for (int i = 1; i <= nNotifications; i++) {
- notifications[i-1] = Integer.valueOf( i );
+ notifications[i - 1] = Integer.valueOf(i);
}
Stopwatch stopWatch = Stopwatch.createStarted();
- List<TestListener<Integer>> listeners = Lists.newArrayList();
+ List<TestListener<Integer>> listeners = new ArrayList<>();
+ List<Thread> threads = new ArrayList<>();
for (int i = 1; i <= nListeners; i++) {
final TestListener<Integer> listener =
i == 2 ? new TestListener2<>(nNotifications, i) :
i == 3 ? new TestListener3<>(nNotifications, i) :
new TestListener<>(nNotifications, i);
- listeners.add( listener );
-
- new Thread( new Runnable() {
- @Override
- public void run() {
- for (int j = 1; j <= nNotifications; j++) {
- final Integer n = notifications[j-1];
- stagingExecutor.execute( new Runnable() {
- @Override
- public void run() {
- manager.submitNotification( listener, n );
- }
- } );
- }
+ listeners.add(listener);
+
+ final Thread t = new Thread(() -> {
+ for (int j = 1; j <= nNotifications; j++) {
+ final Integer n = notifications[j - 1];
+ stagingExecutor.execute(() -> manager.submitNotification(listener, n));
}
- } ).start();
+ });
+
+ t.start();
+ threads.add(t);
+
}
try {
for (TestListener<Integer> listener: listeners) {
listener.verifyNotifications();
- System.out.println( listener.name + " succeeded" );
+ LOG.info("{} succeeded", listener.name);
}
} finally {
stagingExecutor.shutdownNow();
stopWatch.stop();
- System.out.println( "Elapsed time: " + stopWatch );
- System.out.println( queueExecutor );
+ LOG.info("Elapsed time: {}", stopWatch);
+ LOG.info("Executor: {}", queueExecutor);
+
+ for (Thread t : threads) {
+ t.join();
+ }
}
@Test(timeout=10000)
public void testNotificationsWithListenerRuntimeEx() {
- queueExecutor = Executors.newFixedThreadPool( 1 );
+ queueExecutor = Executors.newFixedThreadPool(1);
NotificationManager<TestListener<Integer>, Integer> manager =
new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
10, "TestMgr" );
- TestListener<Integer> listener = new TestListener<>( 2, 1 );
- listener.runtimeEx = new RuntimeException( "mock" );
+ TestListener<Integer> listener = new TestListener<>(2, 1);
+ listener.runtimeEx = mock(RuntimeException.class);
- manager.submitNotification( listener, 1 );
- manager.submitNotification( listener, 2 );
+ manager.submitNotification(listener, 1);
+ manager.submitNotification(listener, 2);
listener.verifyNotifications();
}
@Test(timeout=10000)
public void testNotificationsWithListenerJVMError() {
- final CountDownLatch errorCaughtLatch = new CountDownLatch( 1 );
- queueExecutor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>() ) {
+ final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
+ queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
@Override
- public void execute( final Runnable command ) {
- super.execute( new Runnable() {
- @Override
- public void run() {
- try {
- command.run();
- } catch( Error e ) {
- errorCaughtLatch.countDown();
- }
- }
- });
+ public void execute(final Runnable command) {
+ super.execute(() -> {
+ try {
+ command.run();
+ } catch (Error e) {
+ errorCaughtLatch.countDown();
+ }
+ });
}
};
- NotificationManager<TestListener<Integer>, Integer> manager =
- new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
- 10, "TestMgr" );
+ NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
+ new TestNotifier<>(), 10, "TestMgr");
- TestListener<Integer> listener = new TestListener<>( 2, 1 );
- listener.jvmError = new Error( "mock" );
+ TestListener<Integer> listener = new TestListener<>(2, 1);
+ listener.jvmError = mock(Error.class);
- manager.submitNotification( listener, 1 );
+ manager.submitNotification(listener, 1);
- assertEquals( "JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
- errorCaughtLatch, 5, TimeUnit.SECONDS ) );
+ assertEquals("JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
+ errorCaughtLatch, 5, TimeUnit.SECONDS));
- manager.submitNotification( listener, 2 );
+ manager.submitNotification(listener, 2);
listener.verifyNotifications();
}