X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=common%2Futil%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fyangtools%2Futil%2Fconcurrent%2FQueuedNotificationManagerTest.java;h=69fa995125bb64b9f323a6548499dbe768be076e;hb=0de136825605fc7c15b49bdb1841ff5ea48d8a21;hp=455ccdd1b0b974ad248bd46b7b01945a54cc1a81;hpb=6b5d20f6513bc3e6e5db4a2058ee81308edaa9c8;p=yangtools.git diff --git a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java index 455ccdd1b0..69fa995125 100644 --- a/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java +++ b/common/util/src/test/java/org/opendaylight/yangtools/util/concurrent/QueuedNotificationManagerTest.java @@ -8,9 +8,16 @@ package org.opendaylight.yangtools.util.concurrent; -import static org.junit.Assert.fail; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.Uninterruptibles; +import java.io.PrintStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -20,13 +27,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import org.junit.After; import org.junit.Test; - -import com.google.common.base.Stopwatch; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Uninterruptibles; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Unit tests for QueuedNotificationManager. @@ -46,37 +50,37 @@ public class QueuedNotificationManagerTest { boolean cacheNotifications = true; String name; - TestListener( int expCount, int id ) { + TestListener(final int expCount, final int id) { name = "TestListener " + id; - actual = Collections.synchronizedList( Lists.newArrayListWithCapacity( expCount ) ); - reset( expCount ); + actual = Collections.synchronizedList(new ArrayList<>(expCount)); + reset(expCount); } - void reset( int expCount ) { + void reset(final int expCount) { this.expCount = expCount; - latch = new CountDownLatch( expCount ); + latch = new CountDownLatch(expCount); actual.clear(); } - void onNotification( N data ) { + void onNotification(final N data) { try { - if( sleepTime > 0 ) { - Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS ); + if (sleepTime > 0) { + Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS); } - if( cacheNotifications ) { - actual.add( data ); + if (cacheNotifications) { + actual.add(data); } RuntimeException localRuntimeEx = runtimeEx; - if( localRuntimeEx != null ) { + if (localRuntimeEx != null) { runtimeEx = null; throw localRuntimeEx; } Error localJvmError = jvmError; - if( localJvmError != null ) { + if (localJvmError != null) { jvmError = null; throw localJvmError; } @@ -87,58 +91,57 @@ public class QueuedNotificationManagerTest { } void verifyNotifications() { - boolean done = Uninterruptibles.awaitUninterruptibly( latch, 10, TimeUnit.SECONDS ); - if( !done ) { + boolean done = Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS); + if (!done) { long actualCount = latch.getCount(); - fail( name + ": Received " + (expCount - actualCount) + - " notifications. Expected " + expCount ); + fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount); } } - void verifyNotifications( List expected ) { + void verifyNotifications(final List expected) { verifyNotifications(); - assertEquals( name + ": Notifications", Lists.newArrayList( expected ), actual ); + assertEquals(name + ": Notifications", expected, actual); } // Implement bad hashCode/equals methods to verify it doesn't screw up the // QueuedNotificationManager as it should use reference identity. @Override - public int hashCode(){ + public int hashCode() { return 1; } @Override - public boolean equals( Object obj ){ + public boolean equals(final Object obj) { TestListener other = (TestListener) obj; return other != null; } } static class TestListener2 extends TestListener { - TestListener2( int expCount, int id ) { + TestListener2(final int expCount, final int id) { super(expCount, id); } } static class TestListener3 extends TestListener { - TestListener3( int expCount, int id ) { + TestListener3(final int expCount, final int id) { super(expCount, id); } } - static class TestNotifier implements QueuedNotificationManager.Invoker,N> { - + static class TestNotifier implements QueuedNotificationManager.Invoker, N> { @Override - public void invokeListener( TestListener listener, N notification ) { - listener.onNotification( notification ); + public void invokeListener(final TestListener listener, final N notification) { + listener.onNotification(notification); } } + private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class); private ExecutorService queueExecutor; @After public void tearDown() { - if( queueExecutor != null ) { + if (queueExecutor != null) { queueExecutor.shutdownNow(); } } @@ -147,90 +150,84 @@ public class QueuedNotificationManagerTest { public void testNotificationsWithSingleListener() { queueExecutor = Executors.newFixedThreadPool( 2 ); - NotificationManager, Integer> manager = - new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(), - 10, "TestMgr" ); + NotificationManager, Integer> manager = new QueuedNotificationManager<>(queueExecutor, + new TestNotifier<>(), 10, "TestMgr" ); int initialCount = 6; int nNotifications = 100; - TestListener listener = new TestListener<>( nNotifications, 1 ); + TestListener listener = new TestListener<>(nNotifications, 1); listener.sleepTime = 20; - manager.submitNotifications( listener, Arrays.asList( 1, 2 ) ); - manager.submitNotification( listener, 3 ); - manager.submitNotifications( listener, Arrays.asList( 4, 5 ) ); - manager.submitNotification( listener, 6 ); + manager.submitNotifications(listener, Arrays.asList(1, 2)); + manager.submitNotification(listener, 3); + manager.submitNotifications(listener, Arrays.asList(4, 5)); + manager.submitNotification(listener, 6); - manager.submitNotifications( null, Collections.emptyList() ); - manager.submitNotifications( listener, null ); - manager.submitNotification( listener, null ); + manager.submitNotifications(null, Collections.emptyList()); + manager.submitNotifications(listener, null); + manager.submitNotification(listener, null); - Uninterruptibles.sleepUninterruptibly( 100, TimeUnit.MILLISECONDS ); + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); listener.sleepTime = 0; - List expNotifications = Lists.newArrayListWithCapacity( nNotifications ); - expNotifications.addAll( Arrays.asList( 1, 2, 3, 4, 5, 6 ) ); - for( int i = 1; i <= nNotifications - initialCount; i++ ) { - Integer v = Integer.valueOf( initialCount + i ); - expNotifications.add( v ); - manager.submitNotification( listener, v ); + List expNotifications = new ArrayList<>(nNotifications); + expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6)); + for (int i = 1; i <= nNotifications - initialCount; i++) { + Integer v = Integer.valueOf(initialCount + i); + expNotifications.add(v); + manager.submitNotification(listener, v); } listener.verifyNotifications( expNotifications ); } @Test - public void testNotificationsWithMultipleListeners() { + public void testNotificationsWithMultipleListeners() throws InterruptedException { int nListeners = 10; - queueExecutor = Executors.newFixedThreadPool( nListeners ); - final ExecutorService stagingExecutor = Executors.newFixedThreadPool( nListeners ); - final NotificationManager, Integer> manager = - new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(), - 5000, "TestMgr" ); + queueExecutor = Executors.newFixedThreadPool(nListeners); + final ExecutorService stagingExecutor = Executors.newFixedThreadPool(nListeners); + final NotificationManager, Integer> manager = new QueuedNotificationManager<>( + queueExecutor, new TestNotifier<>(), 5000, "TestMgr" ); final int nNotifications = 100000; - System.out.println( "Testing " + nListeners + " listeners with " + nNotifications + - " notifications each..." ); + LOG.info("Testing {} listeners with {} notifications each...", nListeners, nNotifications); final Integer[] notifications = new Integer[nNotifications]; - for( int i = 1; i <= nNotifications; i++ ) { - notifications[i-1] = Integer.valueOf( i ); + for (int i = 1; i <= nNotifications; i++) { + notifications[i - 1] = Integer.valueOf(i); } Stopwatch stopWatch = Stopwatch.createStarted(); - List> listeners = Lists.newArrayList(); - for( int i = 1; i <= nListeners; i++ ) { + List> listeners = new ArrayList<>(); + List threads = new ArrayList<>(); + for (int i = 1; i <= nListeners; i++) { final TestListener listener = i == 2 ? new TestListener2<>(nNotifications, i) : i == 3 ? new TestListener3<>(nNotifications, i) : new TestListener<>(nNotifications, i); - listeners.add( listener ); - - new Thread( new Runnable() { - @Override - public void run() { - for( int j = 1; j <= nNotifications; j++ ) { - final Integer n = notifications[j-1]; - stagingExecutor.execute( new Runnable() { - @Override - public void run() { - manager.submitNotification( listener, n ); - } - } ); - } + listeners.add(listener); + + final Thread t = new Thread(() -> { + for (int j = 1; j <= nNotifications; j++) { + final Integer n = notifications[j - 1]; + stagingExecutor.execute(() -> manager.submitNotification(listener, n)); } - } ).start(); + }); + + t.start(); + threads.add(t); + } try { - for( TestListener listener: listeners ) { + for (TestListener listener: listeners) { listener.verifyNotifications(); - System.out.println( listener.name + " succeeded" ); + LOG.info("{} succeeded", listener.name); } } finally { stagingExecutor.shutdownNow(); @@ -238,24 +235,30 @@ public class QueuedNotificationManagerTest { stopWatch.stop(); - System.out.println( "Elapsed time: " + stopWatch ); - System.out.println( queueExecutor ); + LOG.info("Elapsed time: {}", stopWatch); + LOG.info("Executor: {}", queueExecutor); + + for (Thread t : threads) { + t.join(); + } } @Test(timeout=10000) public void testNotificationsWithListenerRuntimeEx() { - queueExecutor = Executors.newFixedThreadPool( 1 ); + queueExecutor = Executors.newFixedThreadPool(1); NotificationManager, Integer> manager = new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(), 10, "TestMgr" ); - TestListener listener = new TestListener<>( 2, 1 ); - listener.runtimeEx = new RuntimeException( "mock" ); + TestListener listener = new TestListener<>(2, 1); + final RuntimeException mockedRuntimeException = mock(RuntimeException.class); + doNothing().when(mockedRuntimeException).printStackTrace(any(PrintStream.class)); + listener.runtimeEx = mockedRuntimeException; - manager.submitNotification( listener, 1 ); - manager.submitNotification( listener, 2 ); + manager.submitNotification(listener, 1); + manager.submitNotification(listener, 2); listener.verifyNotifications(); } @@ -263,37 +266,32 @@ public class QueuedNotificationManagerTest { @Test(timeout=10000) public void testNotificationsWithListenerJVMError() { - final CountDownLatch errorCaughtLatch = new CountDownLatch( 1 ); - queueExecutor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS, - new LinkedBlockingQueue<>() ) { + final CountDownLatch errorCaughtLatch = new CountDownLatch(1); + queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) { @Override - public void execute( final Runnable command ) { - super.execute( new Runnable() { - @Override - public void run() { - try { - command.run(); - } catch( Error e ) { - errorCaughtLatch.countDown(); - } - } - }); + public void execute(final Runnable command) { + super.execute(() -> { + try { + command.run(); + } catch (Error e) { + errorCaughtLatch.countDown(); + } + }); } }; - NotificationManager, Integer> manager = - new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(), - 10, "TestMgr" ); + NotificationManager, Integer> manager = new QueuedNotificationManager<>(queueExecutor, + new TestNotifier<>(), 10, "TestMgr"); - TestListener listener = new TestListener<>( 2, 1 ); - listener.jvmError = new Error( "mock" ); + TestListener listener = new TestListener<>(2, 1); + listener.jvmError = mock(Error.class); - manager.submitNotification( listener, 1 ); + manager.submitNotification(listener, 1); - assertEquals( "JVM Error caught", true, Uninterruptibles.awaitUninterruptibly( - errorCaughtLatch, 5, TimeUnit.SECONDS ) ); + assertEquals("JVM Error caught", true, Uninterruptibles.awaitUninterruptibly( + errorCaughtLatch, 5, TimeUnit.SECONDS)); - manager.submitNotification( listener, 2 ); + manager.submitNotification(listener, 2); listener.verifyNotifications(); }