@After
public void tearDown() {
- if( executor != null ) {
+ if (executor != null) {
executor.shutdownNow();
}
}
executor = SpecialExecutors.newBoundedFastThreadPool( 1, 1, "TestPool" );
- for( int i = 0; i < 5; i++ ) {
+ for (int i = 0; i < 5; i++) {
executor.execute( new Task( null, null, null, null,
TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
}
ExecutorService executor = SpecialExecutors.newBoundedCachedThreadPool( 1, 1, "TestPool" );
- for( int i = 0; i < 5; i++ ) {
+ for (int i = 0; i < 5; i++) {
executor.execute( new Task( null, null, null, null,
TimeUnit.MICROSECONDS.convert( 5, TimeUnit.SECONDS ) ) );
}
final ConcurrentMap<Thread, AtomicLong> taskCountPerThread = new ConcurrentHashMap<>();
final AtomicReference<AssertionError> threadError = new AtomicReference<>();
- Stopwatch stopWatch = new Stopwatch();
- stopWatch.start();
+ Stopwatch stopWatch = Stopwatch.createStarted();
new Thread() {
@Override
public void run() {
- for( int i = 0; i < numTasksToRun; i++ ) {
-// if(i%100 == 0) {
+ for (int i = 0; i < numTasksToRun; i++) {
+// if (i%100 == 0) {
// Uninterruptibles.sleepUninterruptibly( 20, TimeUnit.MICROSECONDS );
// }
stopWatch.stop();
- if( !done ) {
+ if (!done) {
fail( (numTasksToRun - tasksRunLatch.getCount()) + " tasks out of " +
numTasksToRun + " executed" );
}
- if( threadError.get() != null ) {
+ if (threadError.get() != null) {
throw threadError.get();
}
System.out.println( taskCountPerThread.size() + " threads used:" );
- for( Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet() ) {
+ for (Map.Entry<Thread, AtomicLong> e : taskCountPerThread.entrySet()) {
System.out.println( " " + e.getKey().getName() + " - " + e.getValue() + " tasks" );
}
System.out.println();
}
- private static class Task implements Runnable {
+ static class Task implements Runnable {
final CountDownLatch tasksRunLatch;
+ final CountDownLatch blockLatch;
final ConcurrentMap<Thread, AtomicLong> taskCountPerThread;
final AtomicReference<AssertionError> threadError;
final String expThreadPrefix;
this.threadError = threadError;
this.expThreadPrefix = expThreadPrefix;
this.delay = delay;
+ blockLatch = null;
+ }
+
+ Task( CountDownLatch tasksRunLatch, CountDownLatch blockLatch ) {
+ this.tasksRunLatch = tasksRunLatch;
+ this.blockLatch = blockLatch;
+ this.taskCountPerThread = null;
+ this.threadError = null;
+ this.expThreadPrefix = null;
+ this.delay = 0;
}
@Override
public void run() {
try {
- if( delay > 0 ) {
- try {
+ try {
+ if (delay > 0) {
TimeUnit.MICROSECONDS.sleep( delay );
- } catch( InterruptedException e ) {}
- }
+ } else if (blockLatch != null) {
+ blockLatch.await();
+ }
+ } catch( InterruptedException e ) {}
- if( expThreadPrefix != null ) {
+ if (expThreadPrefix != null) {
assertEquals( "Thread name starts with " + expThreadPrefix, true,
Thread.currentThread().getName().startsWith( expThreadPrefix ) );
}
- if( taskCountPerThread != null ) {
+ if (taskCountPerThread != null) {
AtomicLong count = taskCountPerThread.get( Thread.currentThread() );
- if( count == null ) {
+ if (count == null) {
count = new AtomicLong( 0 );
AtomicLong prev = taskCountPerThread.putIfAbsent( Thread.currentThread(), count );
- if( prev != null ) {
+ if (prev != null) {
count = prev;
}
}
}
} catch( AssertionError e ) {
- if( threadError != null ) {
+ if (threadError != null) {
threadError.set( e );
}
} finally {
- if( tasksRunLatch != null ) {
+ if (tasksRunLatch != null) {
tasksRunLatch.countDown();
}
}