2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
14 import java.util.concurrent.ConcurrentHashMap;
15 import java.util.concurrent.ConcurrentMap;
16 import java.util.concurrent.CountDownLatch;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.RejectedExecutionException;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.atomic.AtomicLong;
21 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.After;
24 import org.junit.Test;
26 import com.google.common.base.Stopwatch;
29 * Tests various ThreadPoolExecutor implementations.
31 * @author Thomas Pantelis
33 public class ThreadPoolExecutorTest {
35 private ExecutorService executor;
38 public void tearDown() {
39 if( executor != null ) {
40 executor.shutdownNow();
45 public void testFastThreadPoolExecution() throws Exception {
47 testThreadPoolExecution(
48 SpecialExecutors.newBoundedFastThreadPool( 50, 100000, "TestPool" ),
49 100000, "TestPool", 0 );
52 @Test(expected=RejectedExecutionException.class)
53 public void testFastThreadPoolRejectingTask() throws Exception {
55 executor = SpecialExecutors.newBoundedFastThreadPool( 1, 1, "TestPool" );
57 for( int i = 0; i < 5; i++ ) {
58 executor.execute( new Task( null, null, null, null,
59 TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
64 public void testBlockingFastThreadPoolExecution() throws Exception {
66 // With a queue capacity of 1, it should block at some point.
67 testThreadPoolExecution(
68 SpecialExecutors.newBlockingBoundedFastThreadPool( 2, 1, "TestPool" ),
73 public void testCachedThreadPoolExecution() throws Exception {
75 testThreadPoolExecution(
76 SpecialExecutors.newBoundedCachedThreadPool( 10, 100000, "TestPool" ),
77 100000, "TestPool", 0 );
80 @Test(expected=RejectedExecutionException.class)
81 public void testCachedThreadRejectingTask() throws Exception {
83 ExecutorService executor = SpecialExecutors.newBoundedCachedThreadPool( 1, 1, "TestPool" );
85 for( int i = 0; i < 5; i++ ) {
86 executor.execute( new Task( null, null, null, null,
87 TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
92 public void testBlockingCachedThreadPoolExecution() throws Exception {
94 testThreadPoolExecution(
95 SpecialExecutors.newBlockingBoundedCachedThreadPool( 2, 1, "TestPool" ),
99 void testThreadPoolExecution( final ExecutorService executor,
100 final int numTasksToRun, final String expThreadPrefix, final long taskDelay ) throws Exception {
102 this.executor = executor;
104 System.out.println( "\nTesting " + executor.getClass().getSimpleName() + " with " +
105 numTasksToRun + " tasks." );
107 final CountDownLatch tasksRunLatch = new CountDownLatch( numTasksToRun );
108 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
109 final AtomicReference<AssertionError> threadError = new AtomicReference<>();
111 Stopwatch stopWatch = Stopwatch.createStarted();
116 for( int i = 0; i < numTasksToRun; i++ ) {
118 // Uninterruptibles.sleepUninterruptibly( 20, TimeUnit.MICROSECONDS );
121 executor.execute( new Task( tasksRunLatch, taskCountPerThread,
122 threadError, expThreadPrefix, taskDelay ) );
127 boolean done = tasksRunLatch.await( 15, TimeUnit.SECONDS );
132 fail( (numTasksToRun - tasksRunLatch.getCount()) + " tasks out of " +
133 numTasksToRun + " executed" );
136 if( threadError.get() != null ) {
137 throw threadError.get();
140 System.out.println( taskCountPerThread.size() + " threads used:" );
141 for( Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet() ) {
142 System.out.println( " " + e.getKey().getName() + " - " + e.getValue() + " tasks" );
145 System.out.println( "\n" + executor );
146 System.out.println( "\nElapsed time: " + stopWatch );
147 System.out.println();
150 static class Task implements Runnable {
151 final CountDownLatch tasksRunLatch;
152 final CountDownLatch blockLatch;
153 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
154 final AtomicReference<AssertionError> threadError;
155 final String expThreadPrefix;
158 Task( CountDownLatch tasksRunLatch, ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
159 AtomicReference<AssertionError> threadError, String expThreadPrefix, long delay ) {
160 this.tasksRunLatch = tasksRunLatch;
161 this.taskCountPerThread = taskCountPerThread;
162 this.threadError = threadError;
163 this.expThreadPrefix = expThreadPrefix;
168 Task( CountDownLatch tasksRunLatch, CountDownLatch blockLatch ) {
169 this.tasksRunLatch = tasksRunLatch;
170 this.blockLatch = blockLatch;
171 this.taskCountPerThread = null;
172 this.threadError = null;
173 this.expThreadPrefix = null;
182 TimeUnit.MICROSECONDS.sleep( delay );
183 } else if( blockLatch != null ) {
186 } catch( InterruptedException e ) {}
188 if( expThreadPrefix != null ) {
189 assertEquals( "Thread name starts with " + expThreadPrefix, true,
190 Thread.currentThread().getName().startsWith( expThreadPrefix ) );
193 if( taskCountPerThread != null ) {
194 AtomicLong count = taskCountPerThread.get( Thread.currentThread() );
195 if( count == null ) {
196 count = new AtomicLong( 0 );
197 AtomicLong prev = taskCountPerThread.putIfAbsent( Thread.currentThread(), count );
203 count.incrementAndGet();
206 } catch( AssertionError e ) {
207 if( threadError != null ) {
208 threadError.set( e );
211 if( tasksRunLatch != null ) {
212 tasksRunLatch.countDown();