Fix eclipse/checkstyle warnings
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / ThreadPoolExecutorTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static org.junit.Assert.assertTrue;
11 import static org.junit.Assert.fail;
12
13 import com.google.common.base.Stopwatch;
14 import java.util.Map;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.After;
24 import org.junit.Test;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * Tests various ThreadPoolExecutor implementations.
30  *
31  * @author Thomas Pantelis
32  */
33 public class ThreadPoolExecutorTest {
34     private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);
35
36     private ExecutorService executor;
37
38     @After
39     public void tearDown() {
40         if (executor != null) {
41             executor.shutdownNow();
42         }
43     }
44
45     @Test
46     public void testFastThreadPoolExecution() throws InterruptedException {
47         testThreadPoolExecution(SpecialExecutors.newBoundedFastThreadPool(50, 100000, "TestPool"), 100000, "TestPool",
48             0);
49     }
50
51     @Test(expected = RejectedExecutionException.class)
52     public void testFastThreadPoolRejectingTask() throws InterruptedException {
53         executor = SpecialExecutors.newBoundedFastThreadPool(1, 1, "TestPool");
54
55         for (int i = 0; i < 5; i++) {
56             executor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
57         }
58     }
59
60     @Test
61     public void testBlockingFastThreadPoolExecution() throws InterruptedException {
62         // With a queue capacity of 1, it should block at some point.
63         testThreadPoolExecution(SpecialExecutors.newBlockingBoundedFastThreadPool(2, 1, "TestPool"), 1000, null, 10);
64     }
65
66     @Test
67     public void testCachedThreadPoolExecution() throws InterruptedException {
68         testThreadPoolExecution(SpecialExecutors.newBoundedCachedThreadPool(10, 100000, "TestPool"),
69                 100000, "TestPool", 0);
70     }
71
72     @Test(expected = RejectedExecutionException.class)
73     public void testCachedThreadRejectingTask() throws InterruptedException {
74         ExecutorService executor = SpecialExecutors.newBoundedCachedThreadPool(1, 1, "TestPool");
75
76         for (int i = 0; i < 5; i++) {
77             executor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
78         }
79     }
80
81     @Test
82     public void testBlockingCachedThreadPoolExecution() throws InterruptedException {
83         testThreadPoolExecution(SpecialExecutors.newBlockingBoundedCachedThreadPool(2, 1, "TestPool"), 1000, null, 10);
84     }
85
86     void testThreadPoolExecution(final ExecutorService executor, final int numTasksToRun, final String expThreadPrefix,
87             final long taskDelay) throws InterruptedException {
88
89         this.executor = executor;
90
91         LOG.debug("Testing {} with {} tasks.", executor.getClass().getSimpleName(), numTasksToRun);
92
93         final CountDownLatch tasksRunLatch = new CountDownLatch(numTasksToRun);
94         final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
95         final AtomicReference<AssertionError> threadError = new AtomicReference<>();
96
97         Stopwatch stopWatch = Stopwatch.createStarted();
98
99         new Thread() {
100             @Override
101             public void run() {
102                 for (int i = 0; i < numTasksToRun; i++) {
103 //                    if (i%100 == 0) {
104 //                        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MICROSECONDS);
105 //                    }
106
107                     executor.execute(new Task(tasksRunLatch, taskCountPerThread, threadError, expThreadPrefix,
108                         taskDelay));
109                 }
110             }
111         }.start();
112
113         boolean done = tasksRunLatch.await(15, TimeUnit.SECONDS);
114
115         stopWatch.stop();
116
117         if (!done) {
118             fail(numTasksToRun - tasksRunLatch.getCount() + " tasks out of " + numTasksToRun + " executed");
119         }
120
121         if (threadError.get() != null) {
122             throw threadError.get();
123         }
124
125         LOG.debug("{} threads used:", taskCountPerThread.size());
126         for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
127             LOG.debug("  {} - {} tasks", e.getKey().getName(), e.getValue());
128         }
129
130         LOG.debug("{}", executor);
131         LOG.debug("Elapsed time: {}", stopWatch);
132     }
133
134     static class Task implements Runnable {
135         final CountDownLatch tasksRunLatch;
136         final CountDownLatch blockLatch;
137         final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
138         final AtomicReference<AssertionError> threadError;
139         final String expThreadPrefix;
140         final long delay;
141
142         Task(final CountDownLatch tasksRunLatch, final ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
143                 final AtomicReference<AssertionError> threadError, final String expThreadPrefix, final long delay) {
144             this.tasksRunLatch = tasksRunLatch;
145             this.taskCountPerThread = taskCountPerThread;
146             this.threadError = threadError;
147             this.expThreadPrefix = expThreadPrefix;
148             this.delay = delay;
149             blockLatch = null;
150         }
151
152         Task(final CountDownLatch tasksRunLatch, final CountDownLatch blockLatch) {
153             this.tasksRunLatch = tasksRunLatch;
154             this.blockLatch = blockLatch;
155             this.taskCountPerThread = null;
156             this.threadError = null;
157             this.expThreadPrefix = null;
158             this.delay = 0;
159         }
160
161         @Override
162         public void run() {
163             try {
164                 try {
165                     if (delay > 0) {
166                         TimeUnit.MICROSECONDS.sleep(delay);
167                     } else if (blockLatch != null) {
168                         blockLatch.await();
169                     }
170                 } catch (InterruptedException e) {
171                     // Ignored
172                 }
173
174                 if (expThreadPrefix != null) {
175                     assertTrue("Thread name starts with " + expThreadPrefix,
176                             Thread.currentThread().getName().startsWith(expThreadPrefix));
177                 }
178
179                 if (taskCountPerThread != null) {
180                     AtomicLong count = taskCountPerThread.get(Thread.currentThread());
181                     if (count == null) {
182                         count = new AtomicLong(0);
183                         AtomicLong prev = taskCountPerThread.putIfAbsent(Thread.currentThread(), count);
184                         if (prev != null) {
185                             count = prev;
186                         }
187                     }
188
189                     count.incrementAndGet();
190                 }
191
192             } catch (AssertionError e) {
193                 if (threadError != null) {
194                     threadError.set(e);
195                 }
196             } finally {
197                 if (tasksRunLatch != null) {
198                     tasksRunLatch.countDown();
199                 }
200             }
201         }
202     }
203 }