44f90f9884d927df4a20fb939a4ecb4e000334c1
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / ThreadPoolExecutorTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12
13 import com.google.common.base.Stopwatch;
14 import java.util.Map;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.After;
24 import org.junit.Test;
25
26 /**
27  * Tests various ThreadPoolExecutor implementations.
28  *
29  * @author Thomas Pantelis
30  */
31 public class ThreadPoolExecutorTest {
32
33     private ExecutorService executor;
34
35     @After
36     public void tearDown() {
37         if (executor != null) {
38             executor.shutdownNow();
39         }
40     }
41
42     @Test
43     public void testFastThreadPoolExecution() throws Exception {
44
45         testThreadPoolExecution(
46                 SpecialExecutors.newBoundedFastThreadPool( 50, 100000, "TestPool" ),
47                 100000, "TestPool", 0 );
48     }
49
50     @Test(expected = RejectedExecutionException.class)
51     public void testFastThreadPoolRejectingTask() throws Exception {
52
53         executor = SpecialExecutors.newBoundedFastThreadPool( 1, 1, "TestPool" );
54
55         for (int i = 0; i < 5; i++) {
56             executor.execute( new Task( null, null, null, null,
57                     TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
58         }
59     }
60
61     @Test
62     public void testBlockingFastThreadPoolExecution() throws Exception {
63
64         // With a queue capacity of 1, it should block at some point.
65         testThreadPoolExecution(
66                 SpecialExecutors.newBlockingBoundedFastThreadPool( 2, 1, "TestPool" ),
67                 1000, null, 10 );
68     }
69
70     @Test
71     public void testCachedThreadPoolExecution() throws Exception {
72
73         testThreadPoolExecution(
74                 SpecialExecutors.newBoundedCachedThreadPool( 10, 100000, "TestPool" ),
75                 100000, "TestPool", 0 );
76     }
77
78     @Test(expected = RejectedExecutionException.class)
79     public void testCachedThreadRejectingTask() throws Exception {
80
81         ExecutorService executor = SpecialExecutors.newBoundedCachedThreadPool( 1, 1, "TestPool" );
82
83         for (int i = 0; i < 5; i++) {
84             executor.execute( new Task( null, null, null, null,
85                     TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
86         }
87     }
88
89     @Test
90     public void testBlockingCachedThreadPoolExecution() throws Exception {
91
92         testThreadPoolExecution(
93                 SpecialExecutors.newBlockingBoundedCachedThreadPool( 2, 1, "TestPool" ),
94                 1000, null, 10 );
95     }
96
97     void testThreadPoolExecution( final ExecutorService executor,
98             final int numTasksToRun, final String expThreadPrefix, final long taskDelay ) throws Exception {
99
100         this.executor = executor;
101
102         System.out.println("\nTesting " + executor.getClass().getSimpleName() + " with " + numTasksToRun + " tasks.");
103
104         final CountDownLatch tasksRunLatch = new CountDownLatch( numTasksToRun );
105         final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
106         final AtomicReference<AssertionError> threadError = new AtomicReference<>();
107
108         Stopwatch stopWatch = Stopwatch.createStarted();
109
110         new Thread() {
111             @Override
112             public void run() {
113                 for (int i = 0; i < numTasksToRun; i++) {
114 //                    if (i%100 == 0) {
115 //                        Uninterruptibles.sleepUninterruptibly( 20, TimeUnit.MICROSECONDS );
116 //                    }
117
118                     executor.execute( new Task( tasksRunLatch, taskCountPerThread,
119                                                 threadError, expThreadPrefix, taskDelay ) );
120                 }
121             }
122         }.start();
123
124         boolean done = tasksRunLatch.await( 15, TimeUnit.SECONDS );
125
126         stopWatch.stop();
127
128         if (!done) {
129             fail((numTasksToRun - tasksRunLatch.getCount()) + " tasks out of " + numTasksToRun + " executed");
130         }
131
132         if (threadError.get() != null) {
133             throw threadError.get();
134         }
135
136         System.out.println( taskCountPerThread.size() + " threads used:" );
137         for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
138             System.out.println( "  " + e.getKey().getName() + " - " + e.getValue() + " tasks" );
139         }
140
141         System.out.println( "\n" + executor );
142         System.out.println( "\nElapsed time: " + stopWatch );
143         System.out.println();
144     }
145
146     static class Task implements Runnable {
147         final CountDownLatch tasksRunLatch;
148         final CountDownLatch blockLatch;
149         final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
150         final AtomicReference<AssertionError> threadError;
151         final String expThreadPrefix;
152         final long delay;
153
154         Task( CountDownLatch tasksRunLatch, ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
155                 AtomicReference<AssertionError> threadError, String expThreadPrefix, long delay ) {
156             this.tasksRunLatch = tasksRunLatch;
157             this.taskCountPerThread = taskCountPerThread;
158             this.threadError = threadError;
159             this.expThreadPrefix = expThreadPrefix;
160             this.delay = delay;
161             blockLatch = null;
162         }
163
164         Task( CountDownLatch tasksRunLatch, CountDownLatch blockLatch ) {
165             this.tasksRunLatch = tasksRunLatch;
166             this.blockLatch = blockLatch;
167             this.taskCountPerThread = null;
168             this.threadError = null;
169             this.expThreadPrefix = null;
170             this.delay = 0;
171         }
172
173         @Override
174         public void run() {
175             try {
176                 try {
177                     if (delay > 0) {
178                         TimeUnit.MICROSECONDS.sleep( delay );
179                     } else if (blockLatch != null) {
180                         blockLatch.await();
181                     }
182                 } catch (InterruptedException e) {
183                 }
184
185                 if (expThreadPrefix != null) {
186                     assertEquals( "Thread name starts with " + expThreadPrefix, true,
187                             Thread.currentThread().getName().startsWith( expThreadPrefix ) );
188                 }
189
190                 if (taskCountPerThread != null) {
191                     AtomicLong count = taskCountPerThread.get( Thread.currentThread() );
192                     if (count == null) {
193                         count = new AtomicLong( 0 );
194                         AtomicLong prev = taskCountPerThread.putIfAbsent( Thread.currentThread(), count );
195                         if (prev != null) {
196                             count = prev;
197                         }
198                     }
199
200                     count.incrementAndGet();
201                 }
202
203             } catch (AssertionError e) {
204                 if (threadError != null) {
205                     threadError.set( e );
206                 }
207             } finally {
208                 if (tasksRunLatch != null) {
209                     tasksRunLatch.countDown();
210                 }
211             }
212         }
213     }
214 }