Organize Imports to be Checkstyle compliant in utils
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / ThreadPoolExecutorTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12
13 import com.google.common.base.Stopwatch;
14 import java.util.Map;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ConcurrentMap;
17 import java.util.concurrent.CountDownLatch;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicLong;
22 import java.util.concurrent.atomic.AtomicReference;
23 import org.junit.After;
24 import org.junit.Test;
25
26 /**
27  * Tests various ThreadPoolExecutor implementations.
28  *
29  * @author Thomas Pantelis
30  */
31 public class ThreadPoolExecutorTest {
32
33     private ExecutorService executor;
34
35     @After
36     public void tearDown() {
37         if (executor != null) {
38             executor.shutdownNow();
39         }
40     }
41
42     @Test
43     public void testFastThreadPoolExecution() throws Exception {
44
45         testThreadPoolExecution(
46                 SpecialExecutors.newBoundedFastThreadPool( 50, 100000, "TestPool" ),
47                 100000, "TestPool", 0 );
48     }
49
50     @Test(expected=RejectedExecutionException.class)
51     public void testFastThreadPoolRejectingTask() throws Exception {
52
53         executor = SpecialExecutors.newBoundedFastThreadPool( 1, 1, "TestPool" );
54
55         for (int i = 0; i < 5; i++) {
56             executor.execute( new Task( null, null, null, null,
57                     TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
58         }
59     }
60
61     @Test
62     public void testBlockingFastThreadPoolExecution() throws Exception {
63
64         // With a queue capacity of 1, it should block at some point.
65         testThreadPoolExecution(
66                 SpecialExecutors.newBlockingBoundedFastThreadPool( 2, 1, "TestPool" ),
67                 1000, null, 10 );
68     }
69
70     @Test
71     public void testCachedThreadPoolExecution() throws Exception {
72
73         testThreadPoolExecution(
74                 SpecialExecutors.newBoundedCachedThreadPool( 10, 100000, "TestPool" ),
75                 100000, "TestPool", 0 );
76     }
77
78     @Test(expected=RejectedExecutionException.class)
79     public void testCachedThreadRejectingTask() throws Exception {
80
81         ExecutorService executor = SpecialExecutors.newBoundedCachedThreadPool( 1, 1, "TestPool" );
82
83         for (int i = 0; i < 5; i++) {
84             executor.execute( new Task( null, null, null, null,
85                     TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
86         }
87     }
88
89     @Test
90     public void testBlockingCachedThreadPoolExecution() throws Exception {
91
92         testThreadPoolExecution(
93                 SpecialExecutors.newBlockingBoundedCachedThreadPool( 2, 1, "TestPool" ),
94                 1000, null, 10 );
95     }
96
97     void testThreadPoolExecution( final ExecutorService executor,
98             final int numTasksToRun, final String expThreadPrefix, final long taskDelay ) throws Exception {
99
100         this.executor = executor;
101
102         System.out.println( "\nTesting " + executor.getClass().getSimpleName() + " with " +
103                 numTasksToRun + " tasks." );
104
105         final CountDownLatch tasksRunLatch = new CountDownLatch( numTasksToRun );
106         final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
107         final AtomicReference<AssertionError> threadError = new AtomicReference<>();
108
109         Stopwatch stopWatch = Stopwatch.createStarted();
110
111         new Thread() {
112             @Override
113             public void run() {
114                 for (int i = 0; i < numTasksToRun; i++) {
115 //                    if (i%100 == 0) {
116 //                        Uninterruptibles.sleepUninterruptibly( 20, TimeUnit.MICROSECONDS );
117 //                    }
118
119                     executor.execute( new Task( tasksRunLatch, taskCountPerThread,
120                                                 threadError, expThreadPrefix, taskDelay ) );
121                 }
122             }
123         }.start();
124
125         boolean done = tasksRunLatch.await( 15, TimeUnit.SECONDS );
126
127         stopWatch.stop();
128
129         if (!done) {
130             fail( (numTasksToRun - tasksRunLatch.getCount()) + " tasks out of " +
131                    numTasksToRun + " executed" );
132         }
133
134         if (threadError.get() != null) {
135             throw threadError.get();
136         }
137
138         System.out.println( taskCountPerThread.size() + " threads used:" );
139         for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
140             System.out.println( "  " + e.getKey().getName() + " - " + e.getValue() + " tasks" );
141         }
142
143         System.out.println( "\n" + executor );
144         System.out.println( "\nElapsed time: " + stopWatch );
145         System.out.println();
146     }
147
148     static class Task implements Runnable {
149         final CountDownLatch tasksRunLatch;
150         final CountDownLatch blockLatch;
151         final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
152         final AtomicReference<AssertionError> threadError;
153         final String expThreadPrefix;
154         final long delay;
155
156         Task( CountDownLatch tasksRunLatch, ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
157                 AtomicReference<AssertionError> threadError, String expThreadPrefix, long delay ) {
158             this.tasksRunLatch = tasksRunLatch;
159             this.taskCountPerThread = taskCountPerThread;
160             this.threadError = threadError;
161             this.expThreadPrefix = expThreadPrefix;
162             this.delay = delay;
163             blockLatch = null;
164         }
165
166         Task( CountDownLatch tasksRunLatch, CountDownLatch blockLatch ) {
167             this.tasksRunLatch = tasksRunLatch;
168             this.blockLatch = blockLatch;
169             this.taskCountPerThread = null;
170             this.threadError = null;
171             this.expThreadPrefix = null;
172             this.delay = 0;
173         }
174
175         @Override
176         public void run() {
177             try {
178                 try {
179                     if (delay > 0) {
180                         TimeUnit.MICROSECONDS.sleep( delay );
181                     } else if (blockLatch != null) {
182                         blockLatch.await();
183                     }
184                 } catch( InterruptedException e ) {}
185
186                 if (expThreadPrefix != null) {
187                     assertEquals( "Thread name starts with " + expThreadPrefix, true,
188                             Thread.currentThread().getName().startsWith( expThreadPrefix ) );
189                 }
190
191                 if (taskCountPerThread != null) {
192                     AtomicLong count = taskCountPerThread.get( Thread.currentThread() );
193                     if (count == null) {
194                         count = new AtomicLong( 0 );
195                         AtomicLong prev = taskCountPerThread.putIfAbsent( Thread.currentThread(), count );
196                         if (prev != null) {
197                             count = prev;
198                         }
199                     }
200
201                     count.incrementAndGet();
202                 }
203
204             } catch( AssertionError e ) {
205                 if (threadError != null) {
206                     threadError.set( e );
207                 }
208             } finally {
209                 if (tasksRunLatch != null) {
210                     tasksRunLatch.countDown();
211                 }
212             }
213         }
214     }
215 }