2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
14 import com.google.common.base.Stopwatch;
15 import com.google.common.collect.Lists;
16 import com.google.common.util.concurrent.Uninterruptibles;
17 import java.util.Arrays;
18 import java.util.Collections;
19 import java.util.List;
20 import java.util.concurrent.CountDownLatch;
21 import java.util.concurrent.ExecutorService;
22 import java.util.concurrent.Executors;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.ThreadPoolExecutor;
25 import java.util.concurrent.TimeUnit;
26 import org.junit.After;
27 import org.junit.Test;
30 * Unit tests for QueuedNotificationManager.
32 * @author Thomas Pantelis
34 public class QueuedNotificationManagerTest {
36 static class TestListener<N> {
38 private final List<N> actual;
39 private volatile int expCount;
40 private volatile CountDownLatch latch;
41 volatile long sleepTime = 0;
42 volatile RuntimeException runtimeEx;
43 volatile Error jvmError;
44 boolean cacheNotifications = true;
47 TestListener( int expCount, int id ) {
48 name = "TestListener " + id;
49 actual = Collections.synchronizedList( Lists.newArrayListWithCapacity( expCount ) );
53 void reset( int expCount ) {
54 this.expCount = expCount;
55 latch = new CountDownLatch( expCount );
59 void onNotification( N data ) {
63 Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS );
66 if (cacheNotifications) {
70 RuntimeException localRuntimeEx = runtimeEx;
71 if (localRuntimeEx != null) {
76 Error localJvmError = jvmError;
77 if (localJvmError != null) {
87 void verifyNotifications() {
88 boolean done = Uninterruptibles.awaitUninterruptibly( latch, 10, TimeUnit.SECONDS );
90 long actualCount = latch.getCount();
91 fail( name + ": Received " + (expCount - actualCount) +
92 " notifications. Expected " + expCount );
96 void verifyNotifications( List<N> expected ) {
97 verifyNotifications();
98 assertEquals( name + ": Notifications", Lists.newArrayList( expected ), actual );
101 // Implement bad hashCode/equals methods to verify it doesn't screw up the
102 // QueuedNotificationManager as it should use reference identity.
104 public int hashCode(){
109 public boolean equals( Object obj ){
110 TestListener<?> other = (TestListener<?>) obj;
111 return other != null;
115 static class TestListener2<N> extends TestListener<N> {
116 TestListener2( int expCount, int id ) {
121 static class TestListener3<N> extends TestListener<N> {
122 TestListener3( int expCount, int id ) {
127 static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>,N> {
130 public void invokeListener( TestListener<N> listener, N notification ) {
131 listener.onNotification( notification );
135 private ExecutorService queueExecutor;
138 public void tearDown() {
139 if (queueExecutor != null) {
140 queueExecutor.shutdownNow();
145 public void testNotificationsWithSingleListener() {
147 queueExecutor = Executors.newFixedThreadPool( 2 );
148 NotificationManager<TestListener<Integer>, Integer> manager =
149 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
152 int initialCount = 6;
153 int nNotifications = 100;
155 TestListener<Integer> listener = new TestListener<>( nNotifications, 1 );
156 listener.sleepTime = 20;
158 manager.submitNotifications( listener, Arrays.asList( 1, 2 ) );
159 manager.submitNotification( listener, 3 );
160 manager.submitNotifications( listener, Arrays.asList( 4, 5 ) );
161 manager.submitNotification( listener, 6 );
163 manager.submitNotifications( null, Collections.emptyList() );
164 manager.submitNotifications( listener, null );
165 manager.submitNotification( listener, null );
167 Uninterruptibles.sleepUninterruptibly( 100, TimeUnit.MILLISECONDS );
169 listener.sleepTime = 0;
171 List<Integer> expNotifications = Lists.newArrayListWithCapacity( nNotifications );
172 expNotifications.addAll( Arrays.asList( 1, 2, 3, 4, 5, 6 ) );
173 for (int i = 1; i <= nNotifications - initialCount; i++) {
174 Integer v = Integer.valueOf( initialCount + i );
175 expNotifications.add( v );
176 manager.submitNotification( listener, v );
179 listener.verifyNotifications( expNotifications );
183 public void testNotificationsWithMultipleListeners() {
186 queueExecutor = Executors.newFixedThreadPool( nListeners );
187 final ExecutorService stagingExecutor = Executors.newFixedThreadPool( nListeners );
188 final NotificationManager<TestListener<Integer>, Integer> manager =
189 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
192 final int nNotifications = 100000;
194 System.out.println( "Testing " + nListeners + " listeners with " + nNotifications +
195 " notifications each..." );
197 final Integer[] notifications = new Integer[nNotifications];
198 for (int i = 1; i <= nNotifications; i++) {
199 notifications[i-1] = Integer.valueOf( i );
202 Stopwatch stopWatch = Stopwatch.createStarted();
204 List<TestListener<Integer>> listeners = Lists.newArrayList();
205 for (int i = 1; i <= nListeners; i++) {
206 final TestListener<Integer> listener =
207 i == 2 ? new TestListener2<>(nNotifications, i) :
208 i == 3 ? new TestListener3<>(nNotifications, i) :
209 new TestListener<>(nNotifications, i);
210 listeners.add( listener );
212 new Thread( new Runnable() {
215 for (int j = 1; j <= nNotifications; j++) {
216 final Integer n = notifications[j-1];
217 stagingExecutor.execute( new Runnable() {
220 manager.submitNotification( listener, n );
229 for (TestListener<Integer> listener: listeners) {
230 listener.verifyNotifications();
231 System.out.println( listener.name + " succeeded" );
234 stagingExecutor.shutdownNow();
239 System.out.println( "Elapsed time: " + stopWatch );
240 System.out.println( queueExecutor );
244 public void testNotificationsWithListenerRuntimeEx() {
246 queueExecutor = Executors.newFixedThreadPool( 1 );
247 NotificationManager<TestListener<Integer>, Integer> manager =
248 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
252 TestListener<Integer> listener = new TestListener<>( 2, 1 );
253 listener.runtimeEx = new RuntimeException( "mock" );
255 manager.submitNotification( listener, 1 );
256 manager.submitNotification( listener, 2 );
258 listener.verifyNotifications();
262 public void testNotificationsWithListenerJVMError() {
264 final CountDownLatch errorCaughtLatch = new CountDownLatch( 1 );
265 queueExecutor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS,
266 new LinkedBlockingQueue<>() ) {
268 public void execute( final Runnable command ) {
269 super.execute( new Runnable() {
275 errorCaughtLatch.countDown();
282 NotificationManager<TestListener<Integer>, Integer> manager =
283 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
286 TestListener<Integer> listener = new TestListener<>( 2, 1 );
287 listener.jvmError = new Error( "mock" );
289 manager.submitNotification( listener, 1 );
291 assertEquals( "JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
292 errorCaughtLatch, 5, TimeUnit.SECONDS ) );
294 manager.submitNotification( listener, 2 );
296 listener.verifyNotifications();