2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
16 import java.util.concurrent.CountDownLatch;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.RejectedExecutionException;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.atomic.AtomicLong;
21 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.After;
24 import org.junit.Test;
26 import com.google.common.base.Stopwatch;
29 * Tests various ThreadPoolExecutor implementations.
31 * @author Thomas Pantelis
33 public class ThreadPoolExecutorTest {
35 private ExecutorService executor;
38 public void tearDown() {
39 if( executor != null ) {
40 executor.shutdownNow();
45 public void testFastThreadPoolExecution() throws Exception {
47 testThreadPoolExecution(
48 SpecialExecutors.newBoundedFastThreadPool( 50, 100000, "TestPool" ),
49 100000, "TestPool", 0 );
52 @Test(expected=RejectedExecutionException.class)
53 public void testFastThreadPoolRejectingTask() throws Exception {
55 executor = SpecialExecutors.newBoundedFastThreadPool( 1, 1, "TestPool" );
57 for( int i = 0; i < 5; i++ ) {
58 executor.execute( new Task( null, null, null, null,
59 TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
64 public void testBlockingFastThreadPoolExecution() throws Exception {
66 // With a queue capacity of 1, it should block at some point.
67 testThreadPoolExecution(
68 SpecialExecutors.newBlockingBoundedFastThreadPool( 2, 1, "TestPool" ),
73 public void testCachedThreadPoolExecution() throws Exception {
75 testThreadPoolExecution(
76 SpecialExecutors.newBoundedCachedThreadPool( 10, 100000, "TestPool" ),
77 100000, "TestPool", 0 );
80 @Test(expected=RejectedExecutionException.class)
81 public void testCachedThreadRejectingTask() throws Exception {
83 ExecutorService executor = SpecialExecutors.newBoundedCachedThreadPool( 1, 1, "TestPool" );
85 for( int i = 0; i < 5; i++ ) {
86 executor.execute( new Task( null, null, null, null,
87 TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
92 public void testBlockingCachedThreadPoolExecution() throws Exception {
94 testThreadPoolExecution(
95 SpecialExecutors.newBlockingBoundedCachedThreadPool( 2, 1, "TestPool" ),
99 void testThreadPoolExecution( final ExecutorService executor,
100 final int numTasksToRun, final String expThreadPrefix, final long taskDelay ) throws Exception {
102 this.executor = executor;
104 System.out.println( "\nTesting " + executor.getClass().getSimpleName() + " with " +
105 numTasksToRun + " tasks." );
107 final CountDownLatch tasksRunLatch = new CountDownLatch( numTasksToRun );
108 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
109 final AtomicReference<AssertionError> threadError = new AtomicReference<>();
111 Stopwatch stopWatch = new Stopwatch();
117 for( int i = 0; i < numTasksToRun; i++ ) {
119 // Uninterruptibles.sleepUninterruptibly( 20, TimeUnit.MICROSECONDS );
122 executor.execute( new Task( tasksRunLatch, taskCountPerThread,
123 threadError, expThreadPrefix, taskDelay ) );
128 boolean done = tasksRunLatch.await( 15, TimeUnit.SECONDS );
133 fail( (numTasksToRun - tasksRunLatch.getCount()) + " tasks out of " +
134 numTasksToRun + " executed" );
137 if( threadError.get() != null ) {
138 throw threadError.get();
141 System.out.println( taskCountPerThread.size() + " threads used:" );
142 for( Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet() ) {
143 System.out.println( " " + e.getKey().getName() + " - " + e.getValue() + " tasks" );
146 System.out.println( "\n" + executor );
147 System.out.println( "\nElapsed time: " + stopWatch );
148 System.out.println();
151 static class Task implements Runnable {
152 final CountDownLatch tasksRunLatch;
153 final CountDownLatch blockLatch;
154 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
155 final AtomicReference<AssertionError> threadError;
156 final String expThreadPrefix;
159 Task( CountDownLatch tasksRunLatch, ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
160 AtomicReference<AssertionError> threadError, String expThreadPrefix, long delay ) {
161 this.tasksRunLatch = tasksRunLatch;
162 this.taskCountPerThread = taskCountPerThread;
163 this.threadError = threadError;
164 this.expThreadPrefix = expThreadPrefix;
169 Task( CountDownLatch tasksRunLatch, CountDownLatch blockLatch ) {
170 this.tasksRunLatch = tasksRunLatch;
171 this.blockLatch = blockLatch;
172 this.taskCountPerThread = null;
173 this.threadError = null;
174 this.expThreadPrefix = null;
183 TimeUnit.MICROSECONDS.sleep( delay );
184 } else if( blockLatch != null ) {
187 } catch( InterruptedException e ) {}
189 if( expThreadPrefix != null ) {
190 assertEquals( "Thread name starts with " + expThreadPrefix, true,
191 Thread.currentThread().getName().startsWith( expThreadPrefix ) );
194 if( taskCountPerThread != null ) {
195 AtomicLong count = taskCountPerThread.get( Thread.currentThread() );
196 if( count == null ) {
197 count = new AtomicLong( 0 );
198 AtomicLong prev = taskCountPerThread.putIfAbsent( Thread.currentThread(), count );
204 count.incrementAndGet();
207 } catch( AssertionError e ) {
208 if( threadError != null ) {
209 threadError.set( e );
212 if( tasksRunLatch != null ) {
213 tasksRunLatch.countDown();