2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static org.junit.Assert.assertTrue;
11 import static org.junit.Assert.fail;
13 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.After;
24 import org.junit.Test;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * Tests various ThreadPoolExecutor implementations.
31 * @author Thomas Pantelis
33 public class ThreadPoolExecutorTest {
34 private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);
36 private ExecutorService executor;
39 public void tearDown() {
40 if (executor != null) {
41 executor.shutdownNow();
46 public void testFastThreadPoolExecution() throws InterruptedException {
47 testThreadPoolExecution(
48 SpecialExecutors.newBoundedFastThreadPool(50, 100000, "TestPool", getClass()), 100000, "TestPool", 0);
51 @Test(expected = RejectedExecutionException.class)
52 public void testFastThreadPoolRejectingTask() throws InterruptedException {
53 executor = SpecialExecutors.newBoundedFastThreadPool(1, 1, "TestPool", getClass());
55 for (int i = 0; i < 5; i++) {
56 executor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
61 public void testBlockingFastThreadPoolExecution() throws InterruptedException {
62 // With a queue capacity of 1, it should block at some point.
63 testThreadPoolExecution(
64 SpecialExecutors.newBlockingBoundedFastThreadPool(2, 1, "TestPool", getClass()), 1000, null, 10);
68 public void testCachedThreadPoolExecution() throws InterruptedException {
69 testThreadPoolExecution(SpecialExecutors.newBoundedCachedThreadPool(10, 100000, "TestPool", getClass()),
70 100000, "TestPool", 0);
73 @Test(expected = RejectedExecutionException.class)
74 public void testCachedThreadRejectingTask() throws InterruptedException {
75 ExecutorService localExecutor = SpecialExecutors.newBoundedCachedThreadPool(1, 1, "TestPool", getClass());
77 for (int i = 0; i < 5; i++) {
78 localExecutor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
83 public void testBlockingCachedThreadPoolExecution() throws InterruptedException {
84 testThreadPoolExecution(
85 SpecialExecutors.newBlockingBoundedCachedThreadPool(2, 1, "TestPool", getClass()), 1000, null, 10);
88 void testThreadPoolExecution(final ExecutorService executorToTest, final int numTasksToRun,
89 final String expThreadPrefix, final long taskDelay) throws InterruptedException {
91 this.executor = executorToTest;
93 LOG.debug("Testing {} with {} tasks.", executorToTest.getClass().getSimpleName(), numTasksToRun);
95 final CountDownLatch tasksRunLatch = new CountDownLatch(numTasksToRun);
96 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
97 final AtomicReference<AssertionError> threadError = new AtomicReference<>();
99 Stopwatch stopWatch = Stopwatch.createStarted();
104 for (int i = 0; i < numTasksToRun; i++) {
106 // Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MICROSECONDS);
109 executorToTest.execute(new Task(tasksRunLatch, taskCountPerThread, threadError, expThreadPrefix,
115 boolean done = tasksRunLatch.await(15, TimeUnit.SECONDS);
120 fail(numTasksToRun - tasksRunLatch.getCount() + " tasks out of " + numTasksToRun + " executed");
123 if (threadError.get() != null) {
124 throw threadError.get();
127 LOG.debug("{} threads used:", taskCountPerThread.size());
128 for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
129 LOG.debug(" {} - {} tasks", e.getKey().getName(), e.getValue());
132 LOG.debug("{}", executorToTest);
133 LOG.debug("Elapsed time: {}", stopWatch);
136 static class Task implements Runnable {
137 final CountDownLatch tasksRunLatch;
138 final CountDownLatch blockLatch;
139 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
140 final AtomicReference<AssertionError> threadError;
141 final String expThreadPrefix;
144 Task(final CountDownLatch tasksRunLatch, final ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
145 final AtomicReference<AssertionError> threadError, final String expThreadPrefix, final long delay) {
146 this.tasksRunLatch = tasksRunLatch;
147 this.taskCountPerThread = taskCountPerThread;
148 this.threadError = threadError;
149 this.expThreadPrefix = expThreadPrefix;
154 Task(final CountDownLatch tasksRunLatch, final CountDownLatch blockLatch) {
155 this.tasksRunLatch = tasksRunLatch;
156 this.blockLatch = blockLatch;
157 this.taskCountPerThread = null;
158 this.threadError = null;
159 this.expThreadPrefix = null;
168 TimeUnit.MICROSECONDS.sleep(delay);
169 } else if (blockLatch != null) {
172 } catch (InterruptedException e) {
176 if (expThreadPrefix != null) {
177 assertTrue("Thread name starts with " + expThreadPrefix,
178 Thread.currentThread().getName().startsWith(expThreadPrefix));
181 if (taskCountPerThread != null) {
182 AtomicLong count = taskCountPerThread.get(Thread.currentThread());
184 count = new AtomicLong(0);
185 AtomicLong prev = taskCountPerThread.putIfAbsent(Thread.currentThread(), count);
191 count.incrementAndGet();
194 } catch (AssertionError e) {
195 if (threadError != null) {
199 if (tasksRunLatch != null) {
200 tasksRunLatch.countDown();