2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.fail;
12 import static org.junit.Assert.assertEquals;
14 import java.util.Arrays;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.Executors;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
24 import org.junit.After;
25 import org.junit.Test;
27 import com.google.common.base.Stopwatch;
28 import com.google.common.collect.Lists;
29 import com.google.common.util.concurrent.Uninterruptibles;
32 * Unit tests for QueuedNotificationManager.
34 * @author Thomas Pantelis
36 public class QueuedNotificationManagerTest {
38 static class TestListener<N> {
40 private final List<N> actual;
41 private volatile int expCount;
42 private volatile CountDownLatch latch;
43 volatile long sleepTime = 0;
44 volatile RuntimeException runtimeEx;
45 volatile Error jvmError;
46 boolean cacheNotifications = true;
49 TestListener( int expCount, int id ) {
50 name = "TestListener " + id;
51 actual = Collections.synchronizedList( Lists.newArrayListWithCapacity( expCount ) );
55 void reset( int expCount ) {
56 this.expCount = expCount;
57 latch = new CountDownLatch( expCount );
61 void onNotification( N data ) {
65 Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS );
68 if (cacheNotifications) {
72 RuntimeException localRuntimeEx = runtimeEx;
73 if (localRuntimeEx != null) {
78 Error localJvmError = jvmError;
79 if (localJvmError != null) {
89 void verifyNotifications() {
90 boolean done = Uninterruptibles.awaitUninterruptibly( latch, 10, TimeUnit.SECONDS );
92 long actualCount = latch.getCount();
93 fail( name + ": Received " + (expCount - actualCount) +
94 " notifications. Expected " + expCount );
98 void verifyNotifications( List<N> expected ) {
99 verifyNotifications();
100 assertEquals( name + ": Notifications", Lists.newArrayList( expected ), actual );
103 // Implement bad hashCode/equals methods to verify it doesn't screw up the
104 // QueuedNotificationManager as it should use reference identity.
106 public int hashCode(){
111 public boolean equals( Object obj ){
112 TestListener<?> other = (TestListener<?>) obj;
113 return other != null;
117 static class TestListener2<N> extends TestListener<N> {
118 TestListener2( int expCount, int id ) {
123 static class TestListener3<N> extends TestListener<N> {
124 TestListener3( int expCount, int id ) {
129 static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>,N> {
132 public void invokeListener( TestListener<N> listener, N notification ) {
133 listener.onNotification( notification );
137 private ExecutorService queueExecutor;
140 public void tearDown() {
141 if (queueExecutor != null) {
142 queueExecutor.shutdownNow();
147 public void testNotificationsWithSingleListener() {
149 queueExecutor = Executors.newFixedThreadPool( 2 );
150 NotificationManager<TestListener<Integer>, Integer> manager =
151 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
154 int initialCount = 6;
155 int nNotifications = 100;
157 TestListener<Integer> listener = new TestListener<>( nNotifications, 1 );
158 listener.sleepTime = 20;
160 manager.submitNotifications( listener, Arrays.asList( 1, 2 ) );
161 manager.submitNotification( listener, 3 );
162 manager.submitNotifications( listener, Arrays.asList( 4, 5 ) );
163 manager.submitNotification( listener, 6 );
165 manager.submitNotifications( null, Collections.emptyList() );
166 manager.submitNotifications( listener, null );
167 manager.submitNotification( listener, null );
169 Uninterruptibles.sleepUninterruptibly( 100, TimeUnit.MILLISECONDS );
171 listener.sleepTime = 0;
173 List<Integer> expNotifications = Lists.newArrayListWithCapacity( nNotifications );
174 expNotifications.addAll( Arrays.asList( 1, 2, 3, 4, 5, 6 ) );
175 for (int i = 1; i <= nNotifications - initialCount; i++) {
176 Integer v = Integer.valueOf( initialCount + i );
177 expNotifications.add( v );
178 manager.submitNotification( listener, v );
181 listener.verifyNotifications( expNotifications );
185 public void testNotificationsWithMultipleListeners() {
188 queueExecutor = Executors.newFixedThreadPool( nListeners );
189 final ExecutorService stagingExecutor = Executors.newFixedThreadPool( nListeners );
190 final NotificationManager<TestListener<Integer>, Integer> manager =
191 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
194 final int nNotifications = 100000;
196 System.out.println( "Testing " + nListeners + " listeners with " + nNotifications +
197 " notifications each..." );
199 final Integer[] notifications = new Integer[nNotifications];
200 for (int i = 1; i <= nNotifications; i++) {
201 notifications[i-1] = Integer.valueOf( i );
204 Stopwatch stopWatch = Stopwatch.createStarted();
206 List<TestListener<Integer>> listeners = Lists.newArrayList();
207 for (int i = 1; i <= nListeners; i++) {
208 final TestListener<Integer> listener =
209 i == 2 ? new TestListener2<>(nNotifications, i) :
210 i == 3 ? new TestListener3<>(nNotifications, i) :
211 new TestListener<>(nNotifications, i);
212 listeners.add( listener );
214 new Thread( new Runnable() {
217 for (int j = 1; j <= nNotifications; j++) {
218 final Integer n = notifications[j-1];
219 stagingExecutor.execute( new Runnable() {
222 manager.submitNotification( listener, n );
231 for (TestListener<Integer> listener: listeners) {
232 listener.verifyNotifications();
233 System.out.println( listener.name + " succeeded" );
236 stagingExecutor.shutdownNow();
241 System.out.println( "Elapsed time: " + stopWatch );
242 System.out.println( queueExecutor );
246 public void testNotificationsWithListenerRuntimeEx() {
248 queueExecutor = Executors.newFixedThreadPool( 1 );
249 NotificationManager<TestListener<Integer>, Integer> manager =
250 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
254 TestListener<Integer> listener = new TestListener<>( 2, 1 );
255 listener.runtimeEx = new RuntimeException( "mock" );
257 manager.submitNotification( listener, 1 );
258 manager.submitNotification( listener, 2 );
260 listener.verifyNotifications();
264 public void testNotificationsWithListenerJVMError() {
266 final CountDownLatch errorCaughtLatch = new CountDownLatch( 1 );
267 queueExecutor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.SECONDS,
268 new LinkedBlockingQueue<>() ) {
270 public void execute( final Runnable command ) {
271 super.execute( new Runnable() {
277 errorCaughtLatch.countDown();
284 NotificationManager<TestListener<Integer>, Integer> manager =
285 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
288 TestListener<Integer> listener = new TestListener<>( 2, 1 );
289 listener.jvmError = new Error( "mock" );
291 manager.submitNotification( listener, 1 );
293 assertEquals( "JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
294 errorCaughtLatch, 5, TimeUnit.SECONDS ) );
296 manager.submitNotification( listener, 2 );
298 listener.verifyNotifications();