*/
package org.opendaylight.yangtools.util.concurrent;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.base.Stopwatch;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests various ThreadPoolExecutor implementations.
* @author Thomas Pantelis
*/
public class ThreadPoolExecutorTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);
private ExecutorService executor;
}
@Test
- public void testFastThreadPoolExecution() throws Exception {
-
- testThreadPoolExecution(
- SpecialExecutors.newBoundedFastThreadPool( 50, 100000, "TestPool" ),
- 100000, "TestPool", 0 );
+ public void testFastThreadPoolExecution() throws InterruptedException {
+ testThreadPoolExecution(SpecialExecutors.newBoundedFastThreadPool(50, 100000, "TestPool"), 100000, "TestPool",
+ 0);
}
@Test(expected = RejectedExecutionException.class)
- public void testFastThreadPoolRejectingTask() throws Exception {
-
- executor = SpecialExecutors.newBoundedFastThreadPool( 1, 1, "TestPool" );
+ public void testFastThreadPoolRejectingTask() throws InterruptedException {
+ executor = SpecialExecutors.newBoundedFastThreadPool(1, 1, "TestPool");
for (int i = 0; i < 5; i++) {
- executor.execute( new Task( null, null, null, null,
- TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
+ executor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
}
}
@Test
- public void testBlockingFastThreadPoolExecution() throws Exception {
-
+ public void testBlockingFastThreadPoolExecution() throws InterruptedException {
// With a queue capacity of 1, it should block at some point.
- testThreadPoolExecution(
- SpecialExecutors.newBlockingBoundedFastThreadPool( 2, 1, "TestPool" ),
- 1000, null, 10 );
+ testThreadPoolExecution(SpecialExecutors.newBlockingBoundedFastThreadPool(2, 1, "TestPool"), 1000, null, 10);
}
@Test
- public void testCachedThreadPoolExecution() throws Exception {
-
- testThreadPoolExecution(
- SpecialExecutors.newBoundedCachedThreadPool( 10, 100000, "TestPool" ),
- 100000, "TestPool", 0 );
+ public void testCachedThreadPoolExecution() throws InterruptedException {
+ testThreadPoolExecution(SpecialExecutors.newBoundedCachedThreadPool(10, 100000, "TestPool"),
+ 100000, "TestPool", 0);
}
@Test(expected = RejectedExecutionException.class)
- public void testCachedThreadRejectingTask() throws Exception {
-
- ExecutorService executor = SpecialExecutors.newBoundedCachedThreadPool( 1, 1, "TestPool" );
+ public void testCachedThreadRejectingTask() throws InterruptedException {
+ ExecutorService executor = SpecialExecutors.newBoundedCachedThreadPool(1, 1, "TestPool");
for (int i = 0; i < 5; i++) {
- executor.execute( new Task( null, null, null, null,
- TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
+ executor.execute(new Task(null, null, null, null, TimeUnit.MICROSECONDS.convert(5, TimeUnit.SECONDS)));
}
}
@Test
- public void testBlockingCachedThreadPoolExecution() throws Exception {
-
- testThreadPoolExecution(
- SpecialExecutors.newBlockingBoundedCachedThreadPool( 2, 1, "TestPool" ),
- 1000, null, 10 );
+ public void testBlockingCachedThreadPoolExecution() throws InterruptedException {
+ testThreadPoolExecution(SpecialExecutors.newBlockingBoundedCachedThreadPool(2, 1, "TestPool"), 1000, null, 10);
}
- void testThreadPoolExecution( final ExecutorService executor,
- final int numTasksToRun, final String expThreadPrefix, final long taskDelay ) throws Exception {
+ void testThreadPoolExecution(final ExecutorService executor, final int numTasksToRun, final String expThreadPrefix,
+ final long taskDelay) throws InterruptedException {
this.executor = executor;
- System.out.println("\nTesting " + executor.getClass().getSimpleName() + " with " + numTasksToRun + " tasks.");
+ LOG.debug("Testing {} with {} tasks.", executor.getClass().getSimpleName(), numTasksToRun);
- final CountDownLatch tasksRunLatch = new CountDownLatch( numTasksToRun );
+ final CountDownLatch tasksRunLatch = new CountDownLatch(numTasksToRun);
final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
final AtomicReference<AssertionError> threadError = new AtomicReference<>();
public void run() {
for (int i = 0; i < numTasksToRun; i++) {
// if (i%100 == 0) {
-// Uninterruptibles.sleepUninterruptibly( 20, TimeUnit.MICROSECONDS );
+// Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MICROSECONDS);
// }
- executor.execute( new Task( tasksRunLatch, taskCountPerThread,
- threadError, expThreadPrefix, taskDelay ) );
+ executor.execute(new Task(tasksRunLatch, taskCountPerThread, threadError, expThreadPrefix,
+ taskDelay));
}
}
}.start();
- boolean done = tasksRunLatch.await( 15, TimeUnit.SECONDS );
+ boolean done = tasksRunLatch.await(15, TimeUnit.SECONDS);
stopWatch.stop();
if (!done) {
- fail((numTasksToRun - tasksRunLatch.getCount()) + " tasks out of " + numTasksToRun + " executed");
+ fail(numTasksToRun - tasksRunLatch.getCount() + " tasks out of " + numTasksToRun + " executed");
}
if (threadError.get() != null) {
throw threadError.get();
}
- System.out.println( taskCountPerThread.size() + " threads used:" );
+ LOG.debug("{} threads used:", taskCountPerThread.size());
for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
- System.out.println( " " + e.getKey().getName() + " - " + e.getValue() + " tasks" );
+ LOG.debug(" {} - {} tasks", e.getKey().getName(), e.getValue());
}
- System.out.println( "\n" + executor );
- System.out.println( "\nElapsed time: " + stopWatch );
- System.out.println();
+ LOG.debug("{}", executor);
+ LOG.debug("Elapsed time: {}", stopWatch);
}
static class Task implements Runnable {
final String expThreadPrefix;
final long delay;
- Task( CountDownLatch tasksRunLatch, ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
- AtomicReference<AssertionError> threadError, String expThreadPrefix, long delay ) {
+ Task(final CountDownLatch tasksRunLatch, final ConcurrentMap<Thread, AtomicLong> taskCountPerThread,
+ final AtomicReference<AssertionError> threadError, final String expThreadPrefix, final long delay) {
this.tasksRunLatch = tasksRunLatch;
this.taskCountPerThread = taskCountPerThread;
this.threadError = threadError;
blockLatch = null;
}
- Task( CountDownLatch tasksRunLatch, CountDownLatch blockLatch ) {
+ Task(final CountDownLatch tasksRunLatch, final CountDownLatch blockLatch) {
this.tasksRunLatch = tasksRunLatch;
this.blockLatch = blockLatch;
this.taskCountPerThread = null;
try {
try {
if (delay > 0) {
- TimeUnit.MICROSECONDS.sleep( delay );
+ TimeUnit.MICROSECONDS.sleep(delay);
} else if (blockLatch != null) {
blockLatch.await();
}
} catch (InterruptedException e) {
+ // Ignored
}
if (expThreadPrefix != null) {
- assertEquals( "Thread name starts with " + expThreadPrefix, true,
- Thread.currentThread().getName().startsWith( expThreadPrefix ) );
+ assertTrue("Thread name starts with " + expThreadPrefix,
+ Thread.currentThread().getName().startsWith(expThreadPrefix));
}
if (taskCountPerThread != null) {
- AtomicLong count = taskCountPerThread.get( Thread.currentThread() );
+ AtomicLong count = taskCountPerThread.get(Thread.currentThread());
if (count == null) {
- count = new AtomicLong( 0 );
- AtomicLong prev = taskCountPerThread.putIfAbsent( Thread.currentThread(), count );
+ count = new AtomicLong(0);
+ AtomicLong prev = taskCountPerThread.putIfAbsent(Thread.currentThread(), count);
if (prev != null) {
count = prev;
}
} catch (AssertionError e) {
if (threadError != null) {
- threadError.set( e );
+ threadError.set(e);
}
} finally {
if (tasksRunLatch != null) {