X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=common%2Futil%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fyangtools%2Futil%2Fconcurrent%2FQueuedNotificationManagerTest.java;h=8d6c9d473fad50d862011be480cfaec7f768bd41;hb=6bb7f3a20168a59eeeea366d7d30fa29702e522f;hp=69fa995125bb64b9f323a6548499dbe768be076e;hpb=ac6b1c788424ca50221ce7b243e4255a8b6fe4c4;p=yangtools.git diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java index 69fa995125..8d6c9d473f 100644 --- a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java @@ -9,16 +9,15 @@ package org.opendaylight.yangtools.util.concurrent; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Uninterruptibles; -import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -29,6 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Test; +import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,15 +62,15 @@ public class QueuedNotificationManagerTest { actual.clear(); } - void onNotification(final N data) { + void onNotification(final Collection data) { try { if (sleepTime > 0) { - Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); } if (cacheNotifications) { - actual.add(data); + actual.addAll(data); } RuntimeException localRuntimeEx = runtimeEx; @@ -86,12 +86,12 @@ public class QueuedNotificationManagerTest { } } finally { - latch.countDown(); + data.forEach(action -> latch.countDown()); } } void verifyNotifications() { - boolean done = Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS); + boolean done = Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS); if (!done) { long actualCount = latch.getCount(); fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount); @@ -129,10 +129,10 @@ public class QueuedNotificationManagerTest { } } - static class TestNotifier implements QueuedNotificationManager.Invoker, N> { + static class TestNotifier implements BatchedInvoker, N> { @Override - public void invokeListener(final TestListener listener, final N notification) { - listener.onNotification(notification); + public void invokeListener(final TestListener listener, final Collection notifications) { + listener.onNotification(notifications); } } @@ -146,17 +146,16 @@ public class QueuedNotificationManagerTest { } } - @Test(timeout=10000) + @Test(timeout = 10000) public void testNotificationsWithSingleListener() { - queueExecutor = Executors.newFixedThreadPool( 2 ); - NotificationManager, Integer> manager = new QueuedNotificationManager<>(queueExecutor, - new TestNotifier<>(), 10, "TestMgr" ); + queueExecutor = Executors.newFixedThreadPool(2); + NotificationManager, Integer> manager = QueuedNotificationManager.create(queueExecutor, + new TestNotifier<>(), 10, "TestMgr"); - int initialCount = 6; - int nNotifications = 100; + int count = 100; - TestListener listener = new TestListener<>(nNotifications, 1); + TestListener listener = new TestListener<>(count, 1); listener.sleepTime = 20; manager.submitNotifications(listener, Arrays.asList(1, 2)); @@ -172,29 +171,30 @@ public class QueuedNotificationManagerTest { listener.sleepTime = 0; - List expNotifications = new ArrayList<>(nNotifications); + List expNotifications = new ArrayList<>(count); expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6)); - for (int i = 1; i <= nNotifications - initialCount; i++) { - Integer v = Integer.valueOf(initialCount + i); - expNotifications.add(v); - manager.submitNotification(listener, v); + int initialCount = 6; + for (int i = 1; i <= count - initialCount; i++) { + Integer val = Integer.valueOf(initialCount + i); + expNotifications.add(val); + manager.submitNotification(listener, val); } - listener.verifyNotifications( expNotifications ); + listener.verifyNotifications(expNotifications); } @Test public void testNotificationsWithMultipleListeners() throws InterruptedException { - int nListeners = 10; - queueExecutor = Executors.newFixedThreadPool(nListeners); - final ExecutorService stagingExecutor = Executors.newFixedThreadPool(nListeners); - final NotificationManager, Integer> manager = new QueuedNotificationManager<>( - queueExecutor, new TestNotifier<>(), 5000, "TestMgr" ); + int count = 10; + queueExecutor = Executors.newFixedThreadPool(count); + final ExecutorService stagingExecutor = Executors.newFixedThreadPool(count); + final NotificationManager, Integer> manager = QueuedNotificationManager.create( + queueExecutor, new TestNotifier<>(), 5000, "TestMgr"); final int nNotifications = 100000; - LOG.info("Testing {} listeners with {} notifications each...", nListeners, nNotifications); + LOG.info("Testing {} listeners with {} notifications each...", count, nNotifications); final Integer[] notifications = new Integer[nNotifications]; for (int i = 1; i <= nNotifications; i++) { @@ -205,7 +205,7 @@ public class QueuedNotificationManagerTest { List> listeners = new ArrayList<>(); List threads = new ArrayList<>(); - for (int i = 1; i <= nListeners; i++) { + for (int i = 1; i <= count; i++) { final TestListener listener = i == 2 ? new TestListener2<>(nNotifications, i) : i == 3 ? new TestListener3<>(nNotifications, i) : @@ -243,44 +243,44 @@ public class QueuedNotificationManagerTest { } } - @Test(timeout=10000) + @Test(timeout = 10000) public void testNotificationsWithListenerRuntimeEx() { queueExecutor = Executors.newFixedThreadPool(1); - NotificationManager, Integer> manager = - new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(), - 10, "TestMgr" ); - + NotificationManager, Integer> manager = QueuedNotificationManager.create(queueExecutor, + new TestNotifier<>(), 10, "TestMgr"); TestListener listener = new TestListener<>(2, 1); - final RuntimeException mockedRuntimeException = mock(RuntimeException.class); - doNothing().when(mockedRuntimeException).printStackTrace(any(PrintStream.class)); + final RuntimeException mockedRuntimeException = new RuntimeException("mock"); listener.runtimeEx = mockedRuntimeException; manager.submitNotification(listener, 1); manager.submitNotification(listener, 2); listener.verifyNotifications(); + List tasks = queueExecutor.shutdownNow(); + assertTrue(tasks.isEmpty()); } - @Test(timeout=10000) + @Test(timeout = 10000) public void testNotificationsWithListenerJVMError() { final CountDownLatch errorCaughtLatch = new CountDownLatch(1); queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) { - @Override - public void execute(final Runnable command) { - super.execute(() -> { - try { - command.run(); - } catch (Error e) { - errorCaughtLatch.countDown(); - } - }); - } + @Override + @SuppressWarnings("checkstyle:illegalCatch") + public void execute(final Runnable command) { + super.execute(() -> { + try { + command.run(); + } catch (Error e) { + errorCaughtLatch.countDown(); + } + }); + } }; - NotificationManager, Integer> manager = new QueuedNotificationManager<>(queueExecutor, + NotificationManager, Integer> manager = QueuedNotificationManager.create(queueExecutor, new TestNotifier<>(), 10, "TestMgr"); TestListener listener = new TestListener<>(2, 1); @@ -288,11 +288,12 @@ public class QueuedNotificationManagerTest { manager.submitNotification(listener, 1); - assertEquals("JVM Error caught", true, Uninterruptibles.awaitUninterruptibly( - errorCaughtLatch, 5, TimeUnit.SECONDS)); + assertTrue("JVM Error caught", Uninterruptibles.awaitUninterruptibly(errorCaughtLatch, 5, TimeUnit.SECONDS)); manager.submitNotification(listener, 2); listener.verifyNotifications(); + List tasks = queueExecutor.shutdownNow(); + assertTrue(tasks.isEmpty()); } }