package org.opendaylight.yangtools.util.concurrent;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;
+import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
actual.clear();
}
- void onNotification(final N data) {
+ void onNotification(final Collection<? extends N> data) {
try {
if (sleepTime > 0) {
- Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
}
if (cacheNotifications) {
- actual.add(data);
+ actual.addAll(data);
}
RuntimeException localRuntimeEx = runtimeEx;
}
} finally {
- latch.countDown();
+ data.forEach(action -> latch.countDown());
}
}
void verifyNotifications() {
- boolean done = Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
+ boolean done = Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS);
if (!done) {
long actualCount = latch.getCount();
fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
}
}
- static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>, N> {
+ static class TestNotifier<N> implements BatchedInvoker<TestListener<N>, N> {
@Override
- public void invokeListener(final TestListener<N> listener, final N notification) {
- listener.onNotification(notification);
+ public void invokeListener(final TestListener<N> listener, final Collection<? extends N> notifications) {
+ listener.onNotification(notifications);
}
}
}
}
- @Test(timeout=10000)
+ @Test(timeout = 10000)
public void testNotificationsWithSingleListener() {
- queueExecutor = Executors.newFixedThreadPool( 2 );
- NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
- new TestNotifier<>(), 10, "TestMgr" );
+ queueExecutor = Executors.newFixedThreadPool(2);
+ NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(queueExecutor,
+ new TestNotifier<>(), 10, "TestMgr");
- int initialCount = 6;
- int nNotifications = 100;
+ int count = 100;
- TestListener<Integer> listener = new TestListener<>(nNotifications, 1);
+ TestListener<Integer> listener = new TestListener<>(count, 1);
listener.sleepTime = 20;
manager.submitNotifications(listener, Arrays.asList(1, 2));
listener.sleepTime = 0;
- List<Integer> expNotifications = new ArrayList<>(nNotifications);
+ List<Integer> expNotifications = new ArrayList<>(count);
expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
- for (int i = 1; i <= nNotifications - initialCount; i++) {
- Integer v = Integer.valueOf(initialCount + i);
- expNotifications.add(v);
- manager.submitNotification(listener, v);
+ int initialCount = 6;
+ for (int i = 1; i <= count - initialCount; i++) {
+ Integer val = Integer.valueOf(initialCount + i);
+ expNotifications.add(val);
+ manager.submitNotification(listener, val);
}
- listener.verifyNotifications( expNotifications );
+ listener.verifyNotifications(expNotifications);
}
@Test
public void testNotificationsWithMultipleListeners() throws InterruptedException {
- int nListeners = 10;
- queueExecutor = Executors.newFixedThreadPool(nListeners);
- final ExecutorService stagingExecutor = Executors.newFixedThreadPool(nListeners);
- final NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(
- queueExecutor, new TestNotifier<>(), 5000, "TestMgr" );
+ int count = 10;
+ queueExecutor = Executors.newFixedThreadPool(count);
+ final ExecutorService stagingExecutor = Executors.newFixedThreadPool(count);
+ final NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(
+ queueExecutor, new TestNotifier<>(), 5000, "TestMgr");
final int nNotifications = 100000;
- LOG.info("Testing {} listeners with {} notifications each...", nListeners, nNotifications);
+ LOG.info("Testing {} listeners with {} notifications each...", count, nNotifications);
final Integer[] notifications = new Integer[nNotifications];
for (int i = 1; i <= nNotifications; i++) {
List<TestListener<Integer>> listeners = new ArrayList<>();
List<Thread> threads = new ArrayList<>();
- for (int i = 1; i <= nListeners; i++) {
+ for (int i = 1; i <= count; i++) {
final TestListener<Integer> listener =
i == 2 ? new TestListener2<>(nNotifications, i) :
i == 3 ? new TestListener3<>(nNotifications, i) :
}
}
- @Test(timeout=10000)
+ @Test(timeout = 10000)
public void testNotificationsWithListenerRuntimeEx() {
queueExecutor = Executors.newFixedThreadPool(1);
- NotificationManager<TestListener<Integer>, Integer> manager =
- new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
- 10, "TestMgr" );
-
+ NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(queueExecutor,
+ new TestNotifier<>(), 10, "TestMgr");
TestListener<Integer> listener = new TestListener<>(2, 1);
- final RuntimeException mockedRuntimeException = mock(RuntimeException.class);
- doNothing().when(mockedRuntimeException).printStackTrace(any(PrintStream.class));
+ final RuntimeException mockedRuntimeException = new RuntimeException("mock");
listener.runtimeEx = mockedRuntimeException;
manager.submitNotification(listener, 1);
manager.submitNotification(listener, 2);
listener.verifyNotifications();
+ List<Runnable> tasks = queueExecutor.shutdownNow();
+ assertTrue(tasks.isEmpty());
}
- @Test(timeout=10000)
+ @Test(timeout = 10000)
public void testNotificationsWithListenerJVMError() {
final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
- @Override
- public void execute(final Runnable command) {
- super.execute(() -> {
- try {
- command.run();
- } catch (Error e) {
- errorCaughtLatch.countDown();
- }
- });
- }
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public void execute(final Runnable command) {
+ super.execute(() -> {
+ try {
+ command.run();
+ } catch (Error e) {
+ errorCaughtLatch.countDown();
+ }
+ });
+ }
};
- NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
+ NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(queueExecutor,
new TestNotifier<>(), 10, "TestMgr");
TestListener<Integer> listener = new TestListener<>(2, 1);
manager.submitNotification(listener, 1);
- assertEquals("JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
- errorCaughtLatch, 5, TimeUnit.SECONDS));
+ assertTrue("JVM Error caught", Uninterruptibles.awaitUninterruptibly(errorCaughtLatch, 5, TimeUnit.SECONDS));
manager.submitNotification(listener, 2);
listener.verifyNotifications();
+ List<Runnable> tasks = queueExecutor.shutdownNow();
+ assertTrue(tasks.isEmpty());
}
}