Migrate common/util to JUnit5
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / ThreadPoolExecutorTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static org.junit.jupiter.api.Assertions.assertThrows;
11 import static org.junit.jupiter.api.Assertions.assertTrue;
12 import static org.junit.jupiter.api.Assertions.fail;
13
14 import com.google.common.base.Stopwatch;
15 import java.util.Map;
16 import java.util.concurrent.ConcurrentHashMap;
17 import java.util.concurrent.ConcurrentMap;
18 import java.util.concurrent.CountDownLatch;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.atomic.AtomicLong;
23 import java.util.concurrent.atomic.AtomicReference;
24 import org.junit.jupiter.api.AfterEach;
25 import org.junit.jupiter.api.Test;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 /**
30  * Tests various ThreadPoolExecutor implementations.
31  *
32  * @author Thomas Pantelis
33  */
34 class ThreadPoolExecutorTest {
35     private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);
36
37     private ExecutorService executor;
38
39     @AfterEach
40     void tearDown() {
41         if (executor != null) {
42             executor.shutdownNow();
43         }
44     }
45
46     @Test
47     void testFastThreadPoolExecution() throws InterruptedException {
48         testThreadPoolExecution(
49                 SpecialExecutors.newBoundedFastThreadPool(50, 100000, "TestPool", getClass()), 100000, "TestPool", 0);
50     }
51
52     @Test
53     void testFastThreadPoolRejectingTask() throws InterruptedException {
54         assertThrows(RejectedExecutionException.class, () -> {
55             executor = SpecialExecutors.newBoundedFastThreadPool(1, 1, "TestPool", getClass());
56
57             for (int i = 0; i < 5; i++) {
58                 executor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
59             }
60         });
61     }
62
63     @Test
64     void testBlockingFastThreadPoolExecution() throws InterruptedException {
65         // With a queue capacity of 1, it should block at some point.
66         testThreadPoolExecution(
67                 SpecialExecutors.newBlockingBoundedFastThreadPool(2, 1, "TestPool", getClass()), 1000, null, 10);
68     }
69
70     @Test
71     void testCachedThreadPoolExecution() throws InterruptedException {
72         testThreadPoolExecution(SpecialExecutors.newBoundedCachedThreadPool(10, 100000, "TestPool", getClass()),
73                 100000, "TestPool", 0);
74     }
75
76     @Test
77     void testCachedThreadRejectingTask() throws InterruptedException {
78         assertThrows(RejectedExecutionException.class, () -> {
79             ExecutorService localExecutor = SpecialExecutors.newBoundedCachedThreadPool(1, 1, "TestPool", getClass());
80
81             for (int i = 0; i < 5; i++) {
82                 localExecutor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5,
83                         TimeUnit.SECONDS)));
84             }
85         });
86     }
87
88     @Test
89     void testBlockingCachedThreadPoolExecution() throws InterruptedException {
90         testThreadPoolExecution(
91                 SpecialExecutors.newBlockingBoundedCachedThreadPool(2, 1, "TestPool", getClass()), 1000, null, 10);
92     }
93
94     void testThreadPoolExecution(final ExecutorService executorToTest, final int numTasksToRun,
95             final String expThreadPrefix, final long taskDelay) throws InterruptedException {
96
97         this.executor = executorToTest;
98
99         LOG.debug("Testing {} with {} tasks.", executorToTest.getClass().getSimpleName(), numTasksToRun);
100
101         final var tasksRunLatch = new CountDownLatch(numTasksToRun);
102         final var taskCountPerThread = new ConcurrentHashMap<Thread, AtomicLong>();
103         final var threadError = new AtomicReference<AssertionError>();
104
105         final var stopWatch = Stopwatch.createStarted();
106
107         new Thread() {
108             @Override
109             public void run() {
110                 for (int i = 0; i < numTasksToRun; i++) {
111 //                    if (i%100 == 0) {
112 //                        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MICROSECONDS);
113 //                    }
114
115                     executorToTest.execute(new Task(tasksRunLatch, taskCountPerThread, threadError, expThreadPrefix,
116                         taskDelay));
117                 }
118             }
119         }.start();
120
121         final var done = tasksRunLatch.await(15, TimeUnit.SECONDS);
122
123         stopWatch.stop();
124
125         if (!done) {
126             fail(numTasksToRun - tasksRunLatch.getCount() + " tasks out of " + numTasksToRun + " executed");
127         }
128
129         if (threadError.get() != null) {
130             throw threadError.get();
131         }
132
133         LOG.debug("{} threads used:", taskCountPerThread.size());
134         for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
135             LOG.debug("  {} - {} tasks", e.getKey().getName(), e.getValue());
136         }
137
138         LOG.debug("{}", executorToTest);
139         LOG.debug("Elapsed time: {}", stopWatch);
140     }
141
142     static class Task implements Runnable {
143         final CountDownLatch tasksRunLatch;
144         final CountDownLatch blockLatch;
145         final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
146         final AtomicReference<AssertionError> threadError;
147         final String expThreadPrefix;
148         final long delay;
149
150         Task(final CountDownLatch tasksRunLatch, final ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
151                 final AtomicReference<AssertionError> threadError, final String expThreadPrefix, final long delay) {
152             this.tasksRunLatch = tasksRunLatch;
153             this.taskCountPerThread = taskCountPerThread;
154             this.threadError = threadError;
155             this.expThreadPrefix = expThreadPrefix;
156             this.delay = delay;
157             blockLatch = null;
158         }
159
160         Task(final CountDownLatch tasksRunLatch, final CountDownLatch blockLatch) {
161             this.tasksRunLatch = tasksRunLatch;
162             this.blockLatch = blockLatch;
163             this.taskCountPerThread = null;
164             this.threadError = null;
165             this.expThreadPrefix = null;
166             this.delay = 0;
167         }
168
169         @Override
170         public void run() {
171             try {
172                 try {
173                     if (delay > 0) {
174                         TimeUnit.MICROSECONDS.sleep(delay);
175                     } else if (blockLatch != null) {
176                         blockLatch.await();
177                     }
178                 } catch (InterruptedException e) {
179                     // Ignored
180                 }
181
182                 if (expThreadPrefix != null) {
183                     assertTrue(Thread.currentThread().getName().startsWith(expThreadPrefix),
184                             "Thread name starts with " + expThreadPrefix);
185                 }
186
187                 if (taskCountPerThread != null) {
188                     AtomicLong count = taskCountPerThread.get(Thread.currentThread());
189                     if (count == null) {
190                         count = new AtomicLong(0);
191                         AtomicLong prev = taskCountPerThread.putIfAbsent(Thread.currentThread(), count);
192                         if (prev != null) {
193                             count = prev;
194                         }
195                     }
196
197                     count.incrementAndGet();
198                 }
199
200             } catch (AssertionError e) {
201                 if (threadError != null) {
202                     threadError.set(e);
203                 }
204             } finally {
205                 if (tasksRunLatch != null) {
206                     tasksRunLatch.countDown();
207                 }
208             }
209         }
210     }
211 }