2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
13 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.After;
24 import org.junit.Test;
27 * Tests various ThreadPoolExecutor implementations.
29 * @author Thomas Pantelis
31 public class ThreadPoolExecutorTest {
33 private ExecutorService executor;
36 public void tearDown() {
37 if (executor != null) {
38 executor.shutdownNow();
43 public void testFastThreadPoolExecution() throws Exception {
45 testThreadPoolExecution(
46 SpecialExecutors.newBoundedFastThreadPool( 50, 100000, "TestPool" ),
47 100000, "TestPool", 0 );
50 @Test(expected=RejectedExecutionException.class)
51 public void testFastThreadPoolRejectingTask() throws Exception {
53 executor = SpecialExecutors.newBoundedFastThreadPool( 1, 1, "TestPool" );
55 for (int i = 0; i < 5; i++) {
56 executor.execute( new Task( null, null, null, null,
57 TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
62 public void testBlockingFastThreadPoolExecution() throws Exception {
64 // With a queue capacity of 1, it should block at some point.
65 testThreadPoolExecution(
66 SpecialExecutors.newBlockingBoundedFastThreadPool( 2, 1, "TestPool" ),
71 public void testCachedThreadPoolExecution() throws Exception {
73 testThreadPoolExecution(
74 SpecialExecutors.newBoundedCachedThreadPool( 10, 100000, "TestPool" ),
75 100000, "TestPool", 0 );
78 @Test(expected=RejectedExecutionException.class)
79 public void testCachedThreadRejectingTask() throws Exception {
81 ExecutorService executor = SpecialExecutors.newBoundedCachedThreadPool( 1, 1, "TestPool" );
83 for (int i = 0; i < 5; i++) {
84 executor.execute( new Task( null, null, null, null,
85 TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
90 public void testBlockingCachedThreadPoolExecution() throws Exception {
92 testThreadPoolExecution(
93 SpecialExecutors.newBlockingBoundedCachedThreadPool( 2, 1, "TestPool" ),
97 void testThreadPoolExecution( final ExecutorService executor,
98 final int numTasksToRun, final String expThreadPrefix, final long taskDelay ) throws Exception {
100 this.executor = executor;
102 System.out.println( "\nTesting " + executor.getClass().getSimpleName() + " with " +
103 numTasksToRun + " tasks." );
105 final CountDownLatch tasksRunLatch = new CountDownLatch( numTasksToRun );
106 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
107 final AtomicReference<AssertionError> threadError = new AtomicReference<>();
109 Stopwatch stopWatch = Stopwatch.createStarted();
114 for (int i = 0; i < numTasksToRun; i++) {
116 // Uninterruptibles.sleepUninterruptibly( 20, TimeUnit.MICROSECONDS );
119 executor.execute( new Task( tasksRunLatch, taskCountPerThread,
120 threadError, expThreadPrefix, taskDelay ) );
125 boolean done = tasksRunLatch.await( 15, TimeUnit.SECONDS );
130 fail( (numTasksToRun - tasksRunLatch.getCount()) + " tasks out of " +
131 numTasksToRun + " executed" );
134 if (threadError.get() != null) {
135 throw threadError.get();
138 System.out.println( taskCountPerThread.size() + " threads used:" );
139 for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
140 System.out.println( " " + e.getKey().getName() + " - " + e.getValue() + " tasks" );
143 System.out.println( "\n" + executor );
144 System.out.println( "\nElapsed time: " + stopWatch );
145 System.out.println();
148 static class Task implements Runnable {
149 final CountDownLatch tasksRunLatch;
150 final CountDownLatch blockLatch;
151 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
152 final AtomicReference<AssertionError> threadError;
153 final String expThreadPrefix;
156 Task( CountDownLatch tasksRunLatch, ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
157 AtomicReference<AssertionError> threadError, String expThreadPrefix, long delay ) {
158 this.tasksRunLatch = tasksRunLatch;
159 this.taskCountPerThread = taskCountPerThread;
160 this.threadError = threadError;
161 this.expThreadPrefix = expThreadPrefix;
166 Task( CountDownLatch tasksRunLatch, CountDownLatch blockLatch ) {
167 this.tasksRunLatch = tasksRunLatch;
168 this.blockLatch = blockLatch;
169 this.taskCountPerThread = null;
170 this.threadError = null;
171 this.expThreadPrefix = null;
180 TimeUnit.MICROSECONDS.sleep( delay );
181 } else if (blockLatch != null) {
184 } catch( InterruptedException e ) {}
186 if (expThreadPrefix != null) {
187 assertEquals( "Thread name starts with " + expThreadPrefix, true,
188 Thread.currentThread().getName().startsWith( expThreadPrefix ) );
191 if (taskCountPerThread != null) {
192 AtomicLong count = taskCountPerThread.get( Thread.currentThread() );
194 count = new AtomicLong( 0 );
195 AtomicLong prev = taskCountPerThread.putIfAbsent( Thread.currentThread(), count );
201 count.incrementAndGet();
204 } catch( AssertionError e ) {
205 if (threadError != null) {
206 threadError.set( e );
209 if (tasksRunLatch != null) {
210 tasksRunLatch.countDown();