2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
13 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.After;
24 import org.junit.Test;
27 * Tests various ThreadPoolExecutor implementations.
29 * @author Thomas Pantelis
31 public class ThreadPoolExecutorTest {
33 private ExecutorService executor;
36 public void tearDown() {
37 if (executor != null) {
38 executor.shutdownNow();
43 public void testFastThreadPoolExecution() throws Exception {
45 testThreadPoolExecution(
46 SpecialExecutors.newBoundedFastThreadPool( 50, 100000, "TestPool" ),
47 100000, "TestPool", 0 );
50 @Test(expected = RejectedExecutionException.class)
51 public void testFastThreadPoolRejectingTask() throws Exception {
53 executor = SpecialExecutors.newBoundedFastThreadPool( 1, 1, "TestPool" );
55 for (int i = 0; i < 5; i++) {
56 executor.execute( new Task( null, null, null, null,
57 TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
62 public void testBlockingFastThreadPoolExecution() throws Exception {
64 // With a queue capacity of 1, it should block at some point.
65 testThreadPoolExecution(
66 SpecialExecutors.newBlockingBoundedFastThreadPool( 2, 1, "TestPool" ),
71 public void testCachedThreadPoolExecution() throws Exception {
73 testThreadPoolExecution(
74 SpecialExecutors.newBoundedCachedThreadPool( 10, 100000, "TestPool" ),
75 100000, "TestPool", 0 );
78 @Test(expected = RejectedExecutionException.class)
79 public void testCachedThreadRejectingTask() throws Exception {
81 ExecutorService executor = SpecialExecutors.newBoundedCachedThreadPool( 1, 1, "TestPool" );
83 for (int i = 0; i < 5; i++) {
84 executor.execute( new Task( null, null, null, null,
85 TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
90 public void testBlockingCachedThreadPoolExecution() throws Exception {
92 testThreadPoolExecution(
93 SpecialExecutors.newBlockingBoundedCachedThreadPool( 2, 1, "TestPool" ),
97 void testThreadPoolExecution( final ExecutorService executor,
98 final int numTasksToRun, final String expThreadPrefix, final long taskDelay ) throws Exception {
100 this.executor = executor;
102 System.out.println("\nTesting " + executor.getClass().getSimpleName() + " with " + numTasksToRun + " tasks.");
104 final CountDownLatch tasksRunLatch = new CountDownLatch( numTasksToRun );
105 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
106 final AtomicReference<AssertionError> threadError = new AtomicReference<>();
108 Stopwatch stopWatch = Stopwatch.createStarted();
113 for (int i = 0; i < numTasksToRun; i++) {
115 // Uninterruptibles.sleepUninterruptibly( 20, TimeUnit.MICROSECONDS );
118 executor.execute( new Task( tasksRunLatch, taskCountPerThread,
119 threadError, expThreadPrefix, taskDelay ) );
124 boolean done = tasksRunLatch.await( 15, TimeUnit.SECONDS );
129 fail((numTasksToRun - tasksRunLatch.getCount()) + " tasks out of " + numTasksToRun + " executed");
132 if (threadError.get() != null) {
133 throw threadError.get();
136 System.out.println( taskCountPerThread.size() + " threads used:" );
137 for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
138 System.out.println( " " + e.getKey().getName() + " - " + e.getValue() + " tasks" );
141 System.out.println( "\n" + executor );
142 System.out.println( "\nElapsed time: " + stopWatch );
143 System.out.println();
146 static class Task implements Runnable {
147 final CountDownLatch tasksRunLatch;
148 final CountDownLatch blockLatch;
149 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
150 final AtomicReference<AssertionError> threadError;
151 final String expThreadPrefix;
154 Task( CountDownLatch tasksRunLatch, ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
155 AtomicReference<AssertionError> threadError, String expThreadPrefix, long delay ) {
156 this.tasksRunLatch = tasksRunLatch;
157 this.taskCountPerThread = taskCountPerThread;
158 this.threadError = threadError;
159 this.expThreadPrefix = expThreadPrefix;
164 Task( CountDownLatch tasksRunLatch, CountDownLatch blockLatch ) {
165 this.tasksRunLatch = tasksRunLatch;
166 this.blockLatch = blockLatch;
167 this.taskCountPerThread = null;
168 this.threadError = null;
169 this.expThreadPrefix = null;
178 TimeUnit.MICROSECONDS.sleep( delay );
179 } else if (blockLatch != null) {
182 } catch (InterruptedException e) {
185 if (expThreadPrefix != null) {
186 assertEquals( "Thread name starts with " + expThreadPrefix, true,
187 Thread.currentThread().getName().startsWith( expThreadPrefix ) );
190 if (taskCountPerThread != null) {
191 AtomicLong count = taskCountPerThread.get( Thread.currentThread() );
193 count = new AtomicLong( 0 );
194 AtomicLong prev = taskCountPerThread.putIfAbsent( Thread.currentThread(), count );
200 count.incrementAndGet();
203 } catch (AssertionError e) {
204 if (threadError != null) {
205 threadError.set( e );
208 if (tasksRunLatch != null) {
209 tasksRunLatch.countDown();