import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
import com.google.common.base.Objects;
import com.google.common.base.Objects.ToStringHelper;
import com.google.common.base.Preconditions;
private static final long IDLE_TIMEOUT_IN_SEC = 60L;
- private final AtomicLong largestBackingQueueSize = new AtomicLong( 0 );
-
private final ExecutorQueue executorQueue;
private final String threadPrefix;
executorQueue = (ExecutorQueue)super.getQueue();
rejectedTaskHandler = new RejectedTaskHandler(
- executorQueue.getBackingQueue(), largestBackingQueueSize );
+ executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
super.setRejectedExecutionHandler( rejectedTaskHandler );
}
@Override
public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) {
+ Preconditions.checkNotNull( handler );
rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
}
+ @Override
+ public RejectedExecutionHandler getRejectedExecutionHandler(){
+ return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
+ }
+
@Override
public BlockingQueue<Runnable> getQueue(){
return executorQueue.getBackingQueue();
}
public long getLargestQueueSize() {
- return largestBackingQueueSize.get();
+ return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
}
protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
private final LinkedBlockingQueue<Runnable> backingQueue;
ExecutorQueue( int maxBackingQueueSize ) {
- backingQueue = new LinkedBlockingQueue<>( maxBackingQueueSize );
+ backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
}
LinkedBlockingQueue<Runnable> getBackingQueue() {
private static class RejectedTaskHandler implements RejectedExecutionHandler {
private final LinkedBlockingQueue<Runnable> backingQueue;
- private final AtomicLong largestBackingQueueSize;
private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
RejectedTaskHandler( LinkedBlockingQueue<Runnable> backingQueue,
- AtomicLong largestBackingQueueSize ) {
+ RejectedExecutionHandler delegateRejectedExecutionHandler ) {
this.backingQueue = backingQueue;
- this.largestBackingQueueSize = largestBackingQueueSize;
+ this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
}
void setDelegateRejectedExecutionHandler(
- RejectedExecutionHandler delegateRejectedExecutionHandler ){
+ RejectedExecutionHandler delegateRejectedExecutionHandler ) {
this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
}
+ RejectedExecutionHandler getDelegateRejectedExecutionHandler(){
+ return delegateRejectedExecutionHandler;
+ }
+
@Override
public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
if( executor.isShutdown() ) {
}
if( !backingQueue.offer( task ) ) {
- if( delegateRejectedExecutionHandler != null ) {
- delegateRejectedExecutionHandler.rejectedExecution( task, executor );
- } else {
- throw new RejectedExecutionException(
- "All threads are in use and the queue is full" );
- }
- }
-
- largestBackingQueueSize.incrementAndGet();
- long size = backingQueue.size();
- long largest = largestBackingQueueSize.get();
- if( size > largest ) {
- largestBackingQueueSize.compareAndSet( largest, size );
+ delegateRejectedExecutionHandler.rejectedExecution( task, executor );
}
}
}