Merge branch 'master' of ../controller
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / ThreadPoolExecutorTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static org.junit.Assert.assertTrue;
11 import static org.junit.Assert.fail;
12
13 import com.google.common.base.Stopwatch;
14 import java.util.Map;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.After;
24 import org.junit.Test;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * Tests various ThreadPoolExecutor implementations.
30  *
31  * @author Thomas Pantelis
32  */
33 public class ThreadPoolExecutorTest {
34     private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);
35
36     private ExecutorService executor;
37
38     @After
39     public void tearDown() {
40         if (executor != null) {
41             executor.shutdownNow();
42         }
43     }
44
45     @Test
46     public void testFastThreadPoolExecution() throws InterruptedException {
47         testThreadPoolExecution(
48                 SpecialExecutors.newBoundedFastThreadPool(50, 100000, "TestPool", getClass()), 100000, "TestPool", 0);
49     }
50
51     @Test(expected = RejectedExecutionException.class)
52     public void testFastThreadPoolRejectingTask() throws InterruptedException {
53         executor = SpecialExecutors.newBoundedFastThreadPool(1, 1, "TestPool", getClass());
54
55         for (int i = 0; i < 5; i++) {
56             executor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
57         }
58     }
59
60     @Test
61     public void testBlockingFastThreadPoolExecution() throws InterruptedException {
62         // With a queue capacity of 1, it should block at some point.
63         testThreadPoolExecution(
64                 SpecialExecutors.newBlockingBoundedFastThreadPool(2, 1, "TestPool", getClass()), 1000, null, 10);
65     }
66
67     @Test
68     public void testCachedThreadPoolExecution() throws InterruptedException {
69         testThreadPoolExecution(SpecialExecutors.newBoundedCachedThreadPool(10, 100000, "TestPool", getClass()),
70                 100000, "TestPool", 0);
71     }
72
73     @Test(expected = RejectedExecutionException.class)
74     public void testCachedThreadRejectingTask() throws InterruptedException {
75         ExecutorService localExecutor = SpecialExecutors.newBoundedCachedThreadPool(1, 1, "TestPool", getClass());
76
77         for (int i = 0; i < 5; i++) {
78             localExecutor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
79         }
80     }
81
82     @Test
83     public void testBlockingCachedThreadPoolExecution() throws InterruptedException {
84         testThreadPoolExecution(
85                 SpecialExecutors.newBlockingBoundedCachedThreadPool(2, 1, "TestPool", getClass()), 1000, null, 10);
86     }
87
88     void testThreadPoolExecution(final ExecutorService executorToTest, final int numTasksToRun,
89             final String expThreadPrefix, final long taskDelay) throws InterruptedException {
90
91         this.executor = executorToTest;
92
93         LOG.debug("Testing {} with {} tasks.", executorToTest.getClass().getSimpleName(), numTasksToRun);
94
95         final CountDownLatch tasksRunLatch = new CountDownLatch(numTasksToRun);
96         final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
97         final AtomicReference<AssertionError> threadError = new AtomicReference<>();
98
99         Stopwatch stopWatch = Stopwatch.createStarted();
100
101         new Thread() {
102             @Override
103             public void run() {
104                 for (int i = 0; i < numTasksToRun; i++) {
105 //                    if (i%100 == 0) {
106 //                        Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MICROSECONDS);
107 //                    }
108
109                     executorToTest.execute(new Task(tasksRunLatch, taskCountPerThread, threadError, expThreadPrefix,
110                         taskDelay));
111                 }
112             }
113         }.start();
114
115         boolean done = tasksRunLatch.await(15, TimeUnit.SECONDS);
116
117         stopWatch.stop();
118
119         if (!done) {
120             fail(numTasksToRun - tasksRunLatch.getCount() + " tasks out of " + numTasksToRun + " executed");
121         }
122
123         if (threadError.get() != null) {
124             throw threadError.get();
125         }
126
127         LOG.debug("{} threads used:", taskCountPerThread.size());
128         for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
129             LOG.debug("  {} - {} tasks", e.getKey().getName(), e.getValue());
130         }
131
132         LOG.debug("{}", executorToTest);
133         LOG.debug("Elapsed time: {}", stopWatch);
134     }
135
136     static class Task implements Runnable {
137         final CountDownLatch tasksRunLatch;
138         final CountDownLatch blockLatch;
139         final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
140         final AtomicReference<AssertionError> threadError;
141         final String expThreadPrefix;
142         final long delay;
143
144         Task(final CountDownLatch tasksRunLatch, final ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
145                 final AtomicReference<AssertionError> threadError, final String expThreadPrefix, final long delay) {
146             this.tasksRunLatch = tasksRunLatch;
147             this.taskCountPerThread = taskCountPerThread;
148             this.threadError = threadError;
149             this.expThreadPrefix = expThreadPrefix;
150             this.delay = delay;
151             blockLatch = null;
152         }
153
154         Task(final CountDownLatch tasksRunLatch, final CountDownLatch blockLatch) {
155             this.tasksRunLatch = tasksRunLatch;
156             this.blockLatch = blockLatch;
157             this.taskCountPerThread = null;
158             this.threadError = null;
159             this.expThreadPrefix = null;
160             this.delay = 0;
161         }
162
163         @Override
164         public void run() {
165             try {
166                 try {
167                     if (delay > 0) {
168                         TimeUnit.MICROSECONDS.sleep(delay);
169                     } else if (blockLatch != null) {
170                         blockLatch.await();
171                     }
172                 } catch (InterruptedException e) {
173                     // Ignored
174                 }
175
176                 if (expThreadPrefix != null) {
177                     assertTrue("Thread name starts with " + expThreadPrefix,
178                             Thread.currentThread().getName().startsWith(expThreadPrefix));
179                 }
180
181                 if (taskCountPerThread != null) {
182                     AtomicLong count = taskCountPerThread.get(Thread.currentThread());
183                     if (count == null) {
184                         count = new AtomicLong(0);
185                         AtomicLong prev = taskCountPerThread.putIfAbsent(Thread.currentThread(), count);
186                         if (prev != null) {
187                             count = prev;
188                         }
189                     }
190
191                     count.incrementAndGet();
192                 }
193
194             } catch (AssertionError e) {
195                 if (threadError != null) {
196                     threadError.set(e);
197                 }
198             } finally {
199                 if (tasksRunLatch != null) {
200                     tasksRunLatch.countDown();
201                 }
202             }
203         }
204     }
205 }