684797d4daf0f001d80852565c908dbf48ee5d6d
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManagerTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static org.junit.Assert.fail;
12 import static org.junit.Assert.assertEquals;
13
14 import java.util.Arrays;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23
24 import org.junit.After;
25 import org.junit.Test;
26
27 import com.google.common.base.Stopwatch;
28 import com.google.common.collect.Lists;
29 import com.google.common.util.concurrent.Uninterruptibles;
30
31 /**
32  * Unit tests for QueuedNotificationManager.
33  *
34  * @author Thomas Pantelis
35  */
36 public class QueuedNotificationManagerTest {
37
38     static class TestListener<N> {
39
40         private final List<N> actual;
41         private volatile int expCount;
42         private volatile CountDownLatch latch;
43         volatile long sleepTime = 0;
44         volatile RuntimeException runtimeEx;
45         volatile Error jvmError;
46         boolean cacheNotifications = true;
47         String name;
48
49         TestListener( int expCount, int id ) {
50             name = "TestListener " + id;
51             actual = Collections.synchronizedList( Lists.newArrayListWithCapacity( expCount ) );
52             reset( expCount );
53         }
54
55         void reset( int expCount ) {
56             this.expCount = expCount;
57             latch = new CountDownLatch( expCount );
58             actual.clear();
59         }
60
61         void onNotification( N data ) {
62
63             try {
64                 if (sleepTime > 0) {
65                     Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS );
66                 }
67
68                 if (cacheNotifications) {
69                     actual.add( data );
70                 }
71
72                 RuntimeException localRuntimeEx = runtimeEx;
73                 if (localRuntimeEx != null) {
74                     runtimeEx = null;
75                     throw localRuntimeEx;
76                 }
77
78                 Error localJvmError = jvmError;
79                 if (localJvmError != null) {
80                     jvmError = null;
81                     throw localJvmError;
82                 }
83
84             } finally {
85                 latch.countDown();
86             }
87         }
88
89         void verifyNotifications() {
90             boolean done = Uninterruptibles.awaitUninterruptibly( latch, 10, TimeUnit.SECONDS );
91             if (!done) {
92                 long actualCount = latch.getCount();
93                 fail( name + ": Received " + (expCount - actualCount) +
94                       " notifications. Expected " + expCount );
95             }
96         }
97
98         void verifyNotifications( List<N> expected ) {
99             verifyNotifications();
100             assertEquals( name + ": Notifications", Lists.newArrayList( expected ), actual );
101         }
102
103         // Implement bad hashCode/equals methods to verify it doesn't screw up the
104         // QueuedNotificationManager as it should use reference identity.
105         @Override
106         public int hashCode(){
107             return 1;
108         }
109
110         @Override
111         public boolean equals( Object obj ){
112             TestListener<?> other = (TestListener<?>) obj;
113             return other != null;
114         }
115     }
116
117     static class TestListener2<N> extends TestListener<N> {
118         TestListener2( int expCount, int id  ) {
119             super(expCount, id);
120         }
121     }
122
123     static class TestListener3<N> extends TestListener<N> {
124         TestListener3( int expCount, int id ) {
125             super(expCount, id);
126         }
127     }
128
129     static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>,N> {
130
131         @Override
132         public void invokeListener( TestListener<N> listener, N notification ) {
133             listener.onNotification( notification );
134         }
135     }
136
137     private ExecutorService queueExecutor;
138
139     @After
140     public void tearDown() {
141         if (queueExecutor != null) {
142             queueExecutor.shutdownNow();
143         }
144     }
145
146     @Test(timeout=10000)
147     public void testNotificationsWithSingleListener() {
148
149         queueExecutor = Executors.newFixedThreadPool( 2 );
150         NotificationManager<TestListener<Integer>, Integer> manager =
151                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
152                 10, "TestMgr" );
153
154         int initialCount = 6;
155         int nNotifications = 100;
156
157         TestListener<Integer> listener = new TestListener<>( nNotifications, 1 );
158         listener.sleepTime = 20;
159
160         manager.submitNotifications( listener, Arrays.asList( 1, 2 ) );
161         manager.submitNotification( listener, 3 );
162         manager.submitNotifications( listener, Arrays.asList( 4, 5 ) );
163         manager.submitNotification( listener, 6 );
164
165         manager.submitNotifications( null, Collections.emptyList() );
166         manager.submitNotifications( listener, null );
167         manager.submitNotification( listener, null );
168
169         Uninterruptibles.sleepUninterruptibly( 100, TimeUnit.MILLISECONDS );
170
171         listener.sleepTime = 0;
172
173         List<Integer> expNotifications = Lists.newArrayListWithCapacity( nNotifications );
174         expNotifications.addAll( Arrays.asList( 1, 2, 3, 4, 5, 6 ) );
175         for (int i = 1; i <= nNotifications - initialCount; i++) {
176             Integer v = Integer.valueOf( initialCount + i );
177             expNotifications.add( v );
178             manager.submitNotification( listener, v );
179         }
180
181         listener.verifyNotifications( expNotifications );
182     }
183
184     @Test
185     public void testNotificationsWithMultipleListeners() {
186
187         int nListeners = 10;
188         queueExecutor = Executors.newFixedThreadPool( nListeners );
189         final ExecutorService stagingExecutor = Executors.newFixedThreadPool( nListeners );
190         final NotificationManager<TestListener<Integer>, Integer> manager =
191                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
192                 5000, "TestMgr" );
193
194         final int nNotifications = 100000;
195
196         System.out.println( "Testing " + nListeners + " listeners with " + nNotifications +
197                             " notifications each..." );
198
199         final Integer[] notifications = new Integer[nNotifications];
200         for (int i = 1; i <= nNotifications; i++) {
201             notifications[i-1] = Integer.valueOf( i );
202         }
203
204         Stopwatch stopWatch = Stopwatch.createStarted();
205
206         List<TestListener<Integer>> listeners = Lists.newArrayList();
207         for (int i = 1; i <= nListeners; i++) {
208             final TestListener<Integer> listener =
209                     i == 2 ? new TestListener2<>(nNotifications, i) :
210                     i == 3 ? new TestListener3<>(nNotifications, i) :
211                             new TestListener<>(nNotifications, i);
212             listeners.add( listener );
213
214             new Thread( new Runnable() {
215                 @Override
216                 public void run() {
217                     for (int j = 1; j <= nNotifications; j++) {
218                         final Integer n = notifications[j-1];
219                         stagingExecutor.execute( new Runnable() {
220                             @Override
221                             public void run() {
222                                 manager.submitNotification( listener, n );
223                             }
224                         } );
225                     }
226                 }
227             } ).start();
228         }
229
230         try {
231             for (TestListener<Integer> listener: listeners) {
232                 listener.verifyNotifications();
233                 System.out.println( listener.name + " succeeded" );
234             }
235         } finally {
236             stagingExecutor.shutdownNow();
237         }
238
239         stopWatch.stop();
240
241         System.out.println( "Elapsed time: " + stopWatch );
242         System.out.println( queueExecutor );
243     }
244
245     @Test(timeout=10000)
246     public void testNotificationsWithListenerRuntimeEx() {
247
248         queueExecutor = Executors.newFixedThreadPool( 1 );
249         NotificationManager<TestListener<Integer>, Integer> manager =
250                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
251                 10, "TestMgr" );
252
253
254         TestListener<Integer> listener = new TestListener<>( 2, 1 );
255         listener.runtimeEx = new RuntimeException( "mock" );
256
257         manager.submitNotification( listener, 1 );
258         manager.submitNotification( listener, 2 );
259
260         listener.verifyNotifications();
261     }
262
263     @Test(timeout=10000)
264     public void testNotificationsWithListenerJVMError() {
265
266         final CountDownLatch errorCaughtLatch = new CountDownLatch( 1 );
267         queueExecutor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS,
268                 new LinkedBlockingQueue<>() ) {
269              @Override
270              public void execute( final Runnable command ) {
271                  super.execute( new Runnable() {
272                     @Override
273                     public void run() {
274                         try {
275                             command.run();
276                         } catch( Error e ) {
277                             errorCaughtLatch.countDown();
278                         }
279                     }
280                 });
281              }
282         };
283
284         NotificationManager<TestListener<Integer>, Integer> manager =
285                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
286                 10, "TestMgr" );
287
288         TestListener<Integer> listener = new TestListener<>( 2, 1 );
289         listener.jvmError = new Error( "mock" );
290
291         manager.submitNotification( listener, 1 );
292
293         assertEquals( "JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
294                                                        errorCaughtLatch, 5, TimeUnit.SECONDS ) );
295
296         manager.submitNotification( listener, 2 );
297
298         listener.verifyNotifications();
299     }
300 }