2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static org.junit.Assert.assertTrue;
11 import static org.junit.Assert.fail;
13 import com.google.common.base.Stopwatch;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.After;
24 import org.junit.Test;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * Tests various ThreadPoolExecutor implementations.
31 * @author Thomas Pantelis
33 public class ThreadPoolExecutorTest {
34 private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);
36 private ExecutorService executor;
39 public void tearDown() {
40 if (executor != null) {
41 executor.shutdownNow();
46 public void testFastThreadPoolExecution() throws InterruptedException {
47 testThreadPoolExecution(SpecialExecutors.newBoundedFastThreadPool(50, 100000, "TestPool"), 100000, "TestPool",
51 @Test(expected = RejectedExecutionException.class)
52 public void testFastThreadPoolRejectingTask() throws InterruptedException {
53 executor = SpecialExecutors.newBoundedFastThreadPool(1, 1, "TestPool");
55 for (int i = 0; i < 5; i++) {
56 executor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
61 public void testBlockingFastThreadPoolExecution() throws InterruptedException {
62 // With a queue capacity of 1, it should block at some point.
63 testThreadPoolExecution(SpecialExecutors.newBlockingBoundedFastThreadPool(2, 1, "TestPool"), 1000, null, 10);
67 public void testCachedThreadPoolExecution() throws InterruptedException {
68 testThreadPoolExecution(SpecialExecutors.newBoundedCachedThreadPool(10, 100000, "TestPool"),
69 100000, "TestPool", 0);
72 @Test(expected = RejectedExecutionException.class)
73 public void testCachedThreadRejectingTask() throws InterruptedException {
74 ExecutorService localExecutor = SpecialExecutors.newBoundedCachedThreadPool(1, 1, "TestPool");
76 for (int i = 0; i < 5; i++) {
77 localExecutor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
82 public void testBlockingCachedThreadPoolExecution() throws InterruptedException {
83 testThreadPoolExecution(SpecialExecutors.newBlockingBoundedCachedThreadPool(2, 1, "TestPool"), 1000, null, 10);
86 void testThreadPoolExecution(final ExecutorService executorToTest, final int numTasksToRun,
87 final String expThreadPrefix, final long taskDelay) throws InterruptedException {
89 this.executor = executorToTest;
91 LOG.debug("Testing {} with {} tasks.", executorToTest.getClass().getSimpleName(), numTasksToRun);
93 final CountDownLatch tasksRunLatch = new CountDownLatch(numTasksToRun);
94 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
95 final AtomicReference<AssertionError> threadError = new AtomicReference<>();
97 Stopwatch stopWatch = Stopwatch.createStarted();
102 for (int i = 0; i < numTasksToRun; i++) {
104 // Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MICROSECONDS);
107 executorToTest.execute(new Task(tasksRunLatch, taskCountPerThread, threadError, expThreadPrefix,
113 boolean done = tasksRunLatch.await(15, TimeUnit.SECONDS);
118 fail(numTasksToRun - tasksRunLatch.getCount() + " tasks out of " + numTasksToRun + " executed");
121 if (threadError.get() != null) {
122 throw threadError.get();
125 LOG.debug("{} threads used:", taskCountPerThread.size());
126 for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
127 LOG.debug(" {} - {} tasks", e.getKey().getName(), e.getValue());
130 LOG.debug("{}", executorToTest);
131 LOG.debug("Elapsed time: {}", stopWatch);
134 static class Task implements Runnable {
135 final CountDownLatch tasksRunLatch;
136 final CountDownLatch blockLatch;
137 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
138 final AtomicReference<AssertionError> threadError;
139 final String expThreadPrefix;
142 Task(final CountDownLatch tasksRunLatch, final ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
143 final AtomicReference<AssertionError> threadError, final String expThreadPrefix, final long delay) {
144 this.tasksRunLatch = tasksRunLatch;
145 this.taskCountPerThread = taskCountPerThread;
146 this.threadError = threadError;
147 this.expThreadPrefix = expThreadPrefix;
152 Task(final CountDownLatch tasksRunLatch, final CountDownLatch blockLatch) {
153 this.tasksRunLatch = tasksRunLatch;
154 this.blockLatch = blockLatch;
155 this.taskCountPerThread = null;
156 this.threadError = null;
157 this.expThreadPrefix = null;
166 TimeUnit.MICROSECONDS.sleep(delay);
167 } else if (blockLatch != null) {
170 } catch (InterruptedException e) {
174 if (expThreadPrefix != null) {
175 assertTrue("Thread name starts with " + expThreadPrefix,
176 Thread.currentThread().getName().startsWith(expThreadPrefix));
179 if (taskCountPerThread != null) {
180 AtomicLong count = taskCountPerThread.get(Thread.currentThread());
182 count = new AtomicLong(0);
183 AtomicLong prev = taskCountPerThread.putIfAbsent(Thread.currentThread(), count);
189 count.incrementAndGet();
192 } catch (AssertionError e) {
193 if (threadError != null) {
197 if (tasksRunLatch != null) {
198 tasksRunLatch.countDown();