*/
package org.opendaylight.yangtools.util.concurrent;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
private static final long IDLE_TIMEOUT_IN_SEC = 60L;
- private final AtomicLong largestBackingQueueSize = new AtomicLong( 0 );
-
private final ExecutorQueue executorQueue;
private final String threadPrefix;
* @param threadPrefix
* the name prefix for threads created by this executor.
*/
- public CachedThreadPoolExecutor( int maximumPoolSize, int maximumQueueSize, String threadPrefix ) {
+ public CachedThreadPoolExecutor( final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix ) {
// We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
// We don't specify any core threads (first parameter) so, when a task is submitted,
// the base class will always try to offer to the queue. If there is an existing waiting
executorQueue = (ExecutorQueue)super.getQueue();
rejectedTaskHandler = new RejectedTaskHandler(
- executorQueue.getBackingQueue(), largestBackingQueueSize );
+ executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
super.setRejectedExecutionHandler( rejectedTaskHandler );
}
@Override
- public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) {
+ public void setRejectedExecutionHandler( final RejectedExecutionHandler handler ) {
+ Preconditions.checkNotNull( handler );
rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
}
@Override
- public BlockingQueue<Runnable> getQueue(){
+ public RejectedExecutionHandler getRejectedExecutionHandler() {
+ return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
+ }
+
+ @Override
+ public BlockingQueue<Runnable> getQueue() {
return executorQueue.getBackingQueue();
}
public long getLargestQueueSize() {
- return largestBackingQueueSize.get();
+ return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
}
- protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
+ protected ToStringHelper addToStringAttributes( final ToStringHelper toStringHelper ) {
return toStringHelper;
}
@Override
public final String toString() {
- return addToStringAttributes( Objects.toStringHelper( this )
+ return addToStringAttributes( MoreObjects.toStringHelper( this )
.add( "Thread Prefix", threadPrefix )
.add( "Current Thread Pool Size", getPoolSize() )
.add( "Largest Thread Pool Size", getLargestPoolSize() )
private final LinkedBlockingQueue<Runnable> backingQueue;
- ExecutorQueue( int maxBackingQueueSize ) {
- backingQueue = new LinkedBlockingQueue<>( maxBackingQueueSize );
+ ExecutorQueue( final int maxBackingQueueSize ) {
+ backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
}
LinkedBlockingQueue<Runnable> getBackingQueue() {
}
@Override
- public Runnable poll( long timeout, TimeUnit unit ) throws InterruptedException {
+ public Runnable poll( final long timeout, final TimeUnit unit ) throws InterruptedException {
long totalWaitTime = unit.toMillis( timeout );
long waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
Runnable task = null;
// periods, one thread will eventually wake up and get the task from the backingQueue
// and execute it, although slightly delayed.
- while( task == null ) {
+ while (task == null) {
// First try to get a task from the backing queue.
task = backingQueue.poll();
- if( task == null ) {
+ if (task == null) {
// No task in backing - call the base class to wait for one to be offered.
task = super.poll( waitTime, TimeUnit.MILLISECONDS );
totalWaitTime -= POLL_WAIT_TIME_IN_MS;
- if( totalWaitTime <= 0 ) {
+ if (totalWaitTime <= 0 ) {
break;
}
private static class RejectedTaskHandler implements RejectedExecutionHandler {
private final LinkedBlockingQueue<Runnable> backingQueue;
- private final AtomicLong largestBackingQueueSize;
private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
- RejectedTaskHandler( LinkedBlockingQueue<Runnable> backingQueue,
- AtomicLong largestBackingQueueSize ) {
+ RejectedTaskHandler( final LinkedBlockingQueue<Runnable> backingQueue,
+ final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
this.backingQueue = backingQueue;
- this.largestBackingQueueSize = largestBackingQueueSize;
+ this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
}
void setDelegateRejectedExecutionHandler(
- RejectedExecutionHandler delegateRejectedExecutionHandler ){
+ final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
}
+ RejectedExecutionHandler getDelegateRejectedExecutionHandler(){
+ return delegateRejectedExecutionHandler;
+ }
+
@Override
- public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
- if( executor.isShutdown() ) {
+ public void rejectedExecution( final Runnable task, final ThreadPoolExecutor executor ) {
+ if (executor.isShutdown()) {
throw new RejectedExecutionException( "Executor has been shutdown." );
}
- if( !backingQueue.offer( task ) ) {
- if( delegateRejectedExecutionHandler != null ) {
- delegateRejectedExecutionHandler.rejectedExecution( task, executor );
- } else {
- throw new RejectedExecutionException(
- "All threads are in use and the queue is full" );
- }
- }
-
- largestBackingQueueSize.incrementAndGet();
- long size = backingQueue.size();
- long largest = largestBackingQueueSize.get();
- if( size > largest ) {
- largestBackingQueueSize.compareAndSet( largest, size );
+ if (!backingQueue.offer(task)) {
+ delegateRejectedExecutionHandler.rejectedExecution( task, executor );
}
}
}