3833b9387365ff23370d72d66b5a1ae073bfaac9
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManagerTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.Uninterruptibles;
17 import java.util.Arrays;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.concurrent.CountDownLatch;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.After;
27 import org.junit.Test;
28
29 /**
30  * Unit tests for QueuedNotificationManager.
31  *
32  * @author Thomas Pantelis
33  */
34 public class QueuedNotificationManagerTest {
35
36     static class TestListener<N> {
37
38         private final List<N> actual;
39         private volatile int expCount;
40         private volatile CountDownLatch latch;
41         volatile long sleepTime = 0;
42         volatile RuntimeException runtimeEx;
43         volatile Error jvmError;
44         boolean cacheNotifications = true;
45         String name;
46
47         TestListener( int expCount, int id ) {
48             name = "TestListener " + id;
49             actual = Collections.synchronizedList( Lists.newArrayListWithCapacity( expCount ) );
50             reset( expCount );
51         }
52
53         void reset( int expCount ) {
54             this.expCount = expCount;
55             latch = new CountDownLatch( expCount );
56             actual.clear();
57         }
58
59         void onNotification( N data ) {
60
61             try {
62                 if (sleepTime > 0) {
63                     Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS );
64                 }
65
66                 if (cacheNotifications) {
67                     actual.add( data );
68                 }
69
70                 RuntimeException localRuntimeEx = runtimeEx;
71                 if (localRuntimeEx != null) {
72                     runtimeEx = null;
73                     throw localRuntimeEx;
74                 }
75
76                 Error localJvmError = jvmError;
77                 if (localJvmError != null) {
78                     jvmError = null;
79                     throw localJvmError;
80                 }
81
82             } finally {
83                 latch.countDown();
84             }
85         }
86
87         void verifyNotifications() {
88             boolean done = Uninterruptibles.awaitUninterruptibly( latch, 10, TimeUnit.SECONDS );
89             if (!done) {
90                 long actualCount = latch.getCount();
91                 fail( name + ": Received " + (expCount - actualCount) +
92                       " notifications. Expected " + expCount );
93             }
94         }
95
96         void verifyNotifications( List<N> expected ) {
97             verifyNotifications();
98             assertEquals( name + ": Notifications", Lists.newArrayList( expected ), actual );
99         }
100
101         // Implement bad hashCode/equals methods to verify it doesn't screw up the
102         // QueuedNotificationManager as it should use reference identity.
103         @Override
104         public int hashCode(){
105             return 1;
106         }
107
108         @Override
109         public boolean equals( Object obj ){
110             TestListener<?> other = (TestListener<?>) obj;
111             return other != null;
112         }
113     }
114
115     static class TestListener2<N> extends TestListener<N> {
116         TestListener2( int expCount, int id  ) {
117             super(expCount, id);
118         }
119     }
120
121     static class TestListener3<N> extends TestListener<N> {
122         TestListener3( int expCount, int id ) {
123             super(expCount, id);
124         }
125     }
126
127     static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>,N> {
128
129         @Override
130         public void invokeListener( TestListener<N> listener, N notification ) {
131             listener.onNotification( notification );
132         }
133     }
134
135     private ExecutorService queueExecutor;
136
137     @After
138     public void tearDown() {
139         if (queueExecutor != null) {
140             queueExecutor.shutdownNow();
141         }
142     }
143
144     @Test(timeout=10000)
145     public void testNotificationsWithSingleListener() {
146
147         queueExecutor = Executors.newFixedThreadPool( 2 );
148         NotificationManager<TestListener<Integer>, Integer> manager =
149                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
150                 10, "TestMgr" );
151
152         int initialCount = 6;
153         int nNotifications = 100;
154
155         TestListener<Integer> listener = new TestListener<>( nNotifications, 1 );
156         listener.sleepTime = 20;
157
158         manager.submitNotifications( listener, Arrays.asList( 1, 2 ) );
159         manager.submitNotification( listener, 3 );
160         manager.submitNotifications( listener, Arrays.asList( 4, 5 ) );
161         manager.submitNotification( listener, 6 );
162
163         manager.submitNotifications( null, Collections.emptyList() );
164         manager.submitNotifications( listener, null );
165         manager.submitNotification( listener, null );
166
167         Uninterruptibles.sleepUninterruptibly( 100, TimeUnit.MILLISECONDS );
168
169         listener.sleepTime = 0;
170
171         List<Integer> expNotifications = Lists.newArrayListWithCapacity( nNotifications );
172         expNotifications.addAll( Arrays.asList( 1, 2, 3, 4, 5, 6 ) );
173         for (int i = 1; i <= nNotifications - initialCount; i++) {
174             Integer v = Integer.valueOf( initialCount + i );
175             expNotifications.add( v );
176             manager.submitNotification( listener, v );
177         }
178
179         listener.verifyNotifications( expNotifications );
180     }
181
182     @Test
183     public void testNotificationsWithMultipleListeners() {
184
185         int nListeners = 10;
186         queueExecutor = Executors.newFixedThreadPool( nListeners );
187         final ExecutorService stagingExecutor = Executors.newFixedThreadPool( nListeners );
188         final NotificationManager<TestListener<Integer>, Integer> manager =
189                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
190                 5000, "TestMgr" );
191
192         final int nNotifications = 100000;
193
194         System.out.println( "Testing " + nListeners + " listeners with " + nNotifications +
195                             " notifications each..." );
196
197         final Integer[] notifications = new Integer[nNotifications];
198         for (int i = 1; i <= nNotifications; i++) {
199             notifications[i-1] = Integer.valueOf( i );
200         }
201
202         Stopwatch stopWatch = Stopwatch.createStarted();
203
204         List<TestListener<Integer>> listeners = Lists.newArrayList();
205         for (int i = 1; i <= nListeners; i++) {
206             final TestListener<Integer> listener =
207                     i == 2 ? new TestListener2<>(nNotifications, i) :
208                     i == 3 ? new TestListener3<>(nNotifications, i) :
209                             new TestListener<>(nNotifications, i);
210             listeners.add( listener );
211
212             new Thread( new Runnable() {
213                 @Override
214                 public void run() {
215                     for (int j = 1; j <= nNotifications; j++) {
216                         final Integer n = notifications[j-1];
217                         stagingExecutor.execute( new Runnable() {
218                             @Override
219                             public void run() {
220                                 manager.submitNotification( listener, n );
221                             }
222                         } );
223                     }
224                 }
225             } ).start();
226         }
227
228         try {
229             for (TestListener<Integer> listener: listeners) {
230                 listener.verifyNotifications();
231                 System.out.println( listener.name + " succeeded" );
232             }
233         } finally {
234             stagingExecutor.shutdownNow();
235         }
236
237         stopWatch.stop();
238
239         System.out.println( "Elapsed time: " + stopWatch );
240         System.out.println( queueExecutor );
241     }
242
243     @Test(timeout=10000)
244     public void testNotificationsWithListenerRuntimeEx() {
245
246         queueExecutor = Executors.newFixedThreadPool( 1 );
247         NotificationManager<TestListener<Integer>, Integer> manager =
248                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
249                 10, "TestMgr" );
250
251
252         TestListener<Integer> listener = new TestListener<>( 2, 1 );
253         listener.runtimeEx = new RuntimeException( "mock" );
254
255         manager.submitNotification( listener, 1 );
256         manager.submitNotification( listener, 2 );
257
258         listener.verifyNotifications();
259     }
260
261     @Test(timeout=10000)
262     public void testNotificationsWithListenerJVMError() {
263
264         final CountDownLatch errorCaughtLatch = new CountDownLatch( 1 );
265         queueExecutor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS,
266                 new LinkedBlockingQueue<>() ) {
267              @Override
268              public void execute( final Runnable command ) {
269                  super.execute( new Runnable() {
270                     @Override
271                     public void run() {
272                         try {
273                             command.run();
274                         } catch( Error e ) {
275                             errorCaughtLatch.countDown();
276                         }
277                     }
278                 });
279              }
280         };
281
282         NotificationManager<TestListener<Integer>, Integer> manager =
283                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
284                 10, "TestMgr" );
285
286         TestListener<Integer> listener = new TestListener<>( 2, 1 );
287         listener.jvmError = new Error( "mock" );
288
289         manager.submitNotification( listener, 1 );
290
291         assertEquals( "JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
292                                                        errorCaughtLatch, 5, TimeUnit.SECONDS ) );
293
294         manager.submitNotification( listener, 2 );
295
296         listener.verifyNotifications();
297     }
298 }