Allow QueuedNotificationManager to batch notifications
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManagerTest.java
index 3833b9387365ff23370d72d66b5a1ae073bfaac9..f3275802c5869f0270507a936b473bf9e7477a87 100644 (file)
@@ -10,10 +10,11 @@ package org.opendaylight.yangtools.util.concurrent;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -25,6 +26,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Unit tests for QueuedNotificationManager.
@@ -44,27 +47,27 @@ public class QueuedNotificationManagerTest {
         boolean cacheNotifications = true;
         String name;
 
-        TestListener( int expCount, int id ) {
+        TestListener(final int expCount, final int id) {
             name = "TestListener " + id;
-            actual = Collections.synchronizedList( Lists.newArrayListWithCapacity( expCount ) );
-            reset( expCount );
+            actual = Collections.synchronizedList(new ArrayList<>(expCount));
+            reset(expCount);
         }
 
-        void reset( int expCount ) {
+        void reset(final int expCount) {
             this.expCount = expCount;
-            latch = new CountDownLatch( expCount );
+            latch = new CountDownLatch(expCount);
             actual.clear();
         }
 
-        void onNotification( N data ) {
+        void onNotification(final N data) {
 
             try {
                 if (sleepTime > 0) {
-                    Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS );
+                    Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS);
                 }
 
                 if (cacheNotifications) {
-                    actual.add( data );
+                    actual.add(data);
                 }
 
                 RuntimeException localRuntimeEx = runtimeEx;
@@ -85,53 +88,52 @@ public class QueuedNotificationManagerTest {
         }
 
         void verifyNotifications() {
-            boolean done = Uninterruptibles.awaitUninterruptibly( latch, 10, TimeUnit.SECONDS );
+            boolean done = Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
             if (!done) {
                 long actualCount = latch.getCount();
-                fail( name + ": Received " + (expCount - actualCount) +
-                      " notifications. Expected " + expCount );
+                fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
             }
         }
 
-        void verifyNotifications( List<N> expected ) {
+        void verifyNotifications(final List<N> expected) {
             verifyNotifications();
-            assertEquals( name + ": Notifications", Lists.newArrayList( expected ), actual );
+            assertEquals(name + ": Notifications", expected, actual);
         }
 
         // Implement bad hashCode/equals methods to verify it doesn't screw up the
         // QueuedNotificationManager as it should use reference identity.
         @Override
-        public int hashCode(){
+        public int hashCode() {
             return 1;
         }
 
         @Override
-        public boolean equals( Object obj ){
+        public boolean equals(final Object obj) {
             TestListener<?> other = (TestListener<?>) obj;
             return other != null;
         }
     }
 
     static class TestListener2<N> extends TestListener<N> {
-        TestListener2( int expCount, int id  ) {
+        TestListener2(final int expCount, final int id) {
             super(expCount, id);
         }
     }
 
     static class TestListener3<N> extends TestListener<N> {
-        TestListener3( int expCount, int id ) {
+        TestListener3(final int expCount, final int id) {
             super(expCount, id);
         }
     }
 
-    static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>,N> {
-
+    static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>, N> {
         @Override
-        public void invokeListener( TestListener<N> listener, N notification ) {
-            listener.onNotification( notification );
+        public void invokeListener(final TestListener<N> listener, final N notification) {
+            listener.onNotification(notification);
         }
     }
 
+    private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class);
     private ExecutorService queueExecutor;
 
     @After
@@ -145,90 +147,84 @@ public class QueuedNotificationManagerTest {
     public void testNotificationsWithSingleListener() {
 
         queueExecutor = Executors.newFixedThreadPool( 2 );
-        NotificationManager<TestListener<Integer>, Integer> manager =
-                new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
-                10, "TestMgr" );
+        NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
+                new TestNotifier<>(), 10, "TestMgr" );
 
         int initialCount = 6;
         int nNotifications = 100;
 
-        TestListener<Integer> listener = new TestListener<>( nNotifications, 1 );
+        TestListener<Integer> listener = new TestListener<>(nNotifications, 1);
         listener.sleepTime = 20;
 
-        manager.submitNotifications( listener, Arrays.asList( 1, 2 ) );
-        manager.submitNotification( listener, 3 );
-        manager.submitNotifications( listener, Arrays.asList( 4, 5 ) );
-        manager.submitNotification( listener, 6 );
+        manager.submitNotifications(listener, Arrays.asList(1, 2));
+        manager.submitNotification(listener, 3);
+        manager.submitNotifications(listener, Arrays.asList(4, 5));
+        manager.submitNotification(listener, 6);
 
-        manager.submitNotifications( null, Collections.emptyList() );
-        manager.submitNotifications( listener, null );
-        manager.submitNotification( listener, null );
+        manager.submitNotifications(null, Collections.emptyList());
+        manager.submitNotifications(listener, null);
+        manager.submitNotification(listener, null);
 
-        Uninterruptibles.sleepUninterruptibly( 100, TimeUnit.MILLISECONDS );
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
         listener.sleepTime = 0;
 
-        List<Integer> expNotifications = Lists.newArrayListWithCapacity( nNotifications );
-        expNotifications.addAll( Arrays.asList( 1, 2, 3, 4, 5, 6 ) );
+        List<Integer> expNotifications = new ArrayList<>(nNotifications);
+        expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
         for (int i = 1; i <= nNotifications - initialCount; i++) {
-            Integer v = Integer.valueOf( initialCount + i );
-            expNotifications.add( v );
-            manager.submitNotification( listener, v );
+            Integer v = Integer.valueOf(initialCount + i);
+            expNotifications.add(v);
+            manager.submitNotification(listener, v);
         }
 
         listener.verifyNotifications( expNotifications );
     }
 
     @Test
-    public void testNotificationsWithMultipleListeners() {
+    public void testNotificationsWithMultipleListeners() throws InterruptedException {
 
         int nListeners = 10;
-        queueExecutor = Executors.newFixedThreadPool( nListeners );
-        final ExecutorService stagingExecutor = Executors.newFixedThreadPool( nListeners );
-        final NotificationManager<TestListener<Integer>, Integer> manager =
-                new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
-                5000, "TestMgr" );
+        queueExecutor = Executors.newFixedThreadPool(nListeners);
+        final ExecutorService stagingExecutor = Executors.newFixedThreadPool(nListeners);
+        final NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(
+                queueExecutor, new TestNotifier<>(), 5000, "TestMgr" );
 
         final int nNotifications = 100000;
 
-        System.out.println( "Testing " + nListeners + " listeners with " + nNotifications +
-                            " notifications each..." );
+        LOG.info("Testing {} listeners with {} notifications each...",  nListeners, nNotifications);
 
         final Integer[] notifications = new Integer[nNotifications];
         for (int i = 1; i <= nNotifications; i++) {
-            notifications[i-1] = Integer.valueOf( i );
+            notifications[i - 1] = Integer.valueOf(i);
         }
 
         Stopwatch stopWatch = Stopwatch.createStarted();
 
-        List<TestListener<Integer>> listeners = Lists.newArrayList();
+        List<TestListener<Integer>> listeners = new ArrayList<>();
+        List<Thread> threads = new ArrayList<>();
         for (int i = 1; i <= nListeners; i++) {
             final TestListener<Integer> listener =
                     i == 2 ? new TestListener2<>(nNotifications, i) :
                     i == 3 ? new TestListener3<>(nNotifications, i) :
                             new TestListener<>(nNotifications, i);
-            listeners.add( listener );
-
-            new Thread( new Runnable() {
-                @Override
-                public void run() {
-                    for (int j = 1; j <= nNotifications; j++) {
-                        final Integer n = notifications[j-1];
-                        stagingExecutor.execute( new Runnable() {
-                            @Override
-                            public void run() {
-                                manager.submitNotification( listener, n );
-                            }
-                        } );
-                    }
+            listeners.add(listener);
+
+            final Thread t = new Thread(() -> {
+                for (int j = 1; j <= nNotifications; j++) {
+                    final Integer n = notifications[j - 1];
+                    stagingExecutor.execute(() -> manager.submitNotification(listener, n));
                 }
-            } ).start();
+            });
+
+            t.start();
+            threads.add(t);
+
         }
 
         try {
             for (TestListener<Integer> listener: listeners) {
                 listener.verifyNotifications();
-                System.out.println( listener.name + " succeeded" );
+                LOG.info("{} succeeded", listener.name);
             }
         } finally {
             stagingExecutor.shutdownNow();
@@ -236,24 +232,28 @@ public class QueuedNotificationManagerTest {
 
         stopWatch.stop();
 
-        System.out.println( "Elapsed time: " + stopWatch );
-        System.out.println( queueExecutor );
+        LOG.info("Elapsed time: {}", stopWatch);
+        LOG.info("Executor: {}", queueExecutor);
+
+        for (Thread t : threads) {
+            t.join();
+        }
     }
 
     @Test(timeout=10000)
     public void testNotificationsWithListenerRuntimeEx() {
 
-        queueExecutor = Executors.newFixedThreadPool( 1 );
+        queueExecutor = Executors.newFixedThreadPool(1);
         NotificationManager<TestListener<Integer>, Integer> manager =
                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
                 10, "TestMgr" );
 
 
-        TestListener<Integer> listener = new TestListener<>( 2, 1 );
-        listener.runtimeEx = new RuntimeException( "mock" );
+        TestListener<Integer> listener = new TestListener<>(2, 1);
+        listener.runtimeEx = mock(RuntimeException.class);
 
-        manager.submitNotification( listener, 1 );
-        manager.submitNotification( listener, 2 );
+        manager.submitNotification(listener, 1);
+        manager.submitNotification(listener, 2);
 
         listener.verifyNotifications();
     }
@@ -261,37 +261,32 @@ public class QueuedNotificationManagerTest {
     @Test(timeout=10000)
     public void testNotificationsWithListenerJVMError() {
 
-        final CountDownLatch errorCaughtLatch = new CountDownLatch( 1 );
-        queueExecutor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>() ) {
+        final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
+        queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
              @Override
-             public void execute( final Runnable command ) {
-                 super.execute( new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            command.run();
-                        } catch( Error e ) {
-                            errorCaughtLatch.countDown();
-                        }
-                    }
-                });
+             public void execute(final Runnable command) {
+                 super.execute(() -> {
+                     try {
+                         command.run();
+                     } catch (Error e) {
+                         errorCaughtLatch.countDown();
+                     }
+                 });
              }
         };
 
-        NotificationManager<TestListener<Integer>, Integer> manager =
-                new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
-                10, "TestMgr" );
+        NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
+                new TestNotifier<>(), 10, "TestMgr");
 
-        TestListener<Integer> listener = new TestListener<>( 2, 1 );
-        listener.jvmError = new Error( "mock" );
+        TestListener<Integer> listener = new TestListener<>(2, 1);
+        listener.jvmError = mock(Error.class);
 
-        manager.submitNotification( listener, 1 );
+        manager.submitNotification(listener, 1);
 
-        assertEquals( "JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
-                                                       errorCaughtLatch, 5, TimeUnit.SECONDS ) );
+        assertEquals("JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
+                                                       errorCaughtLatch, 5, TimeUnit.SECONDS));
 
-        manager.submitNotification( listener, 2 );
+        manager.submitNotification(listener, 2);
 
         listener.verifyNotifications();
     }