2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static org.junit.jupiter.api.Assertions.assertThrows;
11 import static org.junit.jupiter.api.Assertions.assertTrue;
12 import static org.junit.jupiter.api.Assertions.fail;
14 import com.google.common.base.Stopwatch;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicLong;
23 import java.util.concurrent.atomic.AtomicReference;
24 import org.junit.jupiter.api.AfterEach;
25 import org.junit.jupiter.api.Test;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
30 * Tests various ThreadPoolExecutor implementations.
32 * @author Thomas Pantelis
34 class ThreadPoolExecutorTest {
35 private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);
37 private ExecutorService executor;
41 if (executor != null) {
42 executor.shutdownNow();
47 void testFastThreadPoolExecution() throws InterruptedException {
48 testThreadPoolExecution(
49 SpecialExecutors.newBoundedFastThreadPool(50, 100000, "TestPool", getClass()), 100000, "TestPool", 0);
53 void testFastThreadPoolRejectingTask() throws InterruptedException {
54 assertThrows(RejectedExecutionException.class, () -> {
55 executor = SpecialExecutors.newBoundedFastThreadPool(1, 1, "TestPool", getClass());
57 for (int i = 0; i < 5; i++) {
58 executor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
64 void testBlockingFastThreadPoolExecution() throws InterruptedException {
65 // With a queue capacity of 1, it should block at some point.
66 testThreadPoolExecution(
67 SpecialExecutors.newBlockingBoundedFastThreadPool(2, 1, "TestPool", getClass()), 1000, null, 10);
71 void testCachedThreadPoolExecution() throws InterruptedException {
72 testThreadPoolExecution(SpecialExecutors.newBoundedCachedThreadPool(10, 100000, "TestPool", getClass()),
73 100000, "TestPool", 0);
77 void testCachedThreadRejectingTask() throws InterruptedException {
78 assertThrows(RejectedExecutionException.class, () -> {
79 ExecutorService localExecutor = SpecialExecutors.newBoundedCachedThreadPool(1, 1, "TestPool", getClass());
81 for (int i = 0; i < 5; i++) {
82 localExecutor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5,
89 void testBlockingCachedThreadPoolExecution() throws InterruptedException {
90 testThreadPoolExecution(
91 SpecialExecutors.newBlockingBoundedCachedThreadPool(2, 1, "TestPool", getClass()), 1000, null, 10);
94 void testThreadPoolExecution(final ExecutorService executorToTest, final int numTasksToRun,
95 final String expThreadPrefix, final long taskDelay) throws InterruptedException {
97 this.executor = executorToTest;
99 LOG.debug("Testing {} with {} tasks.", executorToTest.getClass().getSimpleName(), numTasksToRun);
101 final var tasksRunLatch = new CountDownLatch(numTasksToRun);
102 final var taskCountPerThread = new ConcurrentHashMap<Thread, AtomicLong>();
103 final var threadError = new AtomicReference<AssertionError>();
105 final var stopWatch = Stopwatch.createStarted();
110 for (int i = 0; i < numTasksToRun; i++) {
112 // Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MICROSECONDS);
115 executorToTest.execute(new Task(tasksRunLatch, taskCountPerThread, threadError, expThreadPrefix,
121 final var done = tasksRunLatch.await(15, TimeUnit.SECONDS);
126 fail(numTasksToRun - tasksRunLatch.getCount() + " tasks out of " + numTasksToRun + " executed");
129 if (threadError.get() != null) {
130 throw threadError.get();
133 LOG.debug("{} threads used:", taskCountPerThread.size());
134 for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
135 LOG.debug(" {} - {} tasks", e.getKey().getName(), e.getValue());
138 LOG.debug("{}", executorToTest);
139 LOG.debug("Elapsed time: {}", stopWatch);
142 static class Task implements Runnable {
143 final CountDownLatch tasksRunLatch;
144 final CountDownLatch blockLatch;
145 final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
146 final AtomicReference<AssertionError> threadError;
147 final String expThreadPrefix;
150 Task(final CountDownLatch tasksRunLatch, final ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
151 final AtomicReference<AssertionError> threadError, final String expThreadPrefix, final long delay) {
152 this.tasksRunLatch = tasksRunLatch;
153 this.taskCountPerThread = taskCountPerThread;
154 this.threadError = threadError;
155 this.expThreadPrefix = expThreadPrefix;
160 Task(final CountDownLatch tasksRunLatch, final CountDownLatch blockLatch) {
161 this.tasksRunLatch = tasksRunLatch;
162 this.blockLatch = blockLatch;
163 this.taskCountPerThread = null;
164 this.threadError = null;
165 this.expThreadPrefix = null;
174 TimeUnit.MICROSECONDS.sleep(delay);
175 } else if (blockLatch != null) {
178 } catch (InterruptedException e) {
182 if (expThreadPrefix != null) {
183 assertTrue(Thread.currentThread().getName().startsWith(expThreadPrefix),
184 "Thread name starts with " + expThreadPrefix);
187 if (taskCountPerThread != null) {
188 AtomicLong count = taskCountPerThread.get(Thread.currentThread());
190 count = new AtomicLong(0);
191 AtomicLong prev = taskCountPerThread.putIfAbsent(Thread.currentThread(), count);
197 count.incrementAndGet();
200 } catch (AssertionError e) {
201 if (threadError != null) {
205 if (tasksRunLatch != null) {
206 tasksRunLatch.countDown();