// been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
// to offer to the backing queue. If that succeeds, the task will execute as soon as a
// thread becomes available. If the offer fails to the backing queue, the task is rejected.
- super( 0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
- new ExecutorQueue( maximumQueueSize ) );
+ super(0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
+ new ExecutorQueue(maximumQueueSize));
- this.threadPrefix = Preconditions.checkNotNull( threadPrefix );
+ this.threadPrefix = Preconditions.checkNotNull(threadPrefix);
this.maximumQueueSize = maximumQueueSize;
- setThreadFactory( new ThreadFactoryBuilder().setDaemon( true )
- .setNameFormat( this.threadPrefix + "-%d" ).build() );
+ setThreadFactory(new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat(this.threadPrefix + "-%d").build());
executorQueue = (ExecutorQueue)super.getQueue();
rejectedTaskHandler = new RejectedTaskHandler(
- executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
- super.setRejectedExecutionHandler( rejectedTaskHandler );
+ executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy());
+ super.setRejectedExecutionHandler(rejectedTaskHandler);
}
@Override
- public void setRejectedExecutionHandler( final RejectedExecutionHandler handler ) {
- Preconditions.checkNotNull( handler );
- rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
+ public void setRejectedExecutionHandler(final RejectedExecutionHandler handler) {
+ Preconditions.checkNotNull(handler);
+ rejectedTaskHandler.setDelegateRejectedExecutionHandler(handler);
}
@Override
return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
}
- protected ToStringHelper addToStringAttributes( final ToStringHelper toStringHelper ) {
+ protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
return toStringHelper;
}
@Override
public final String toString() {
- return addToStringAttributes( MoreObjects.toStringHelper( this )
- .add( "Thread Prefix", threadPrefix )
- .add( "Current Thread Pool Size", getPoolSize() )
- .add( "Largest Thread Pool Size", getLargestPoolSize() )
- .add( "Max Thread Pool Size", getMaximumPoolSize() )
- .add( "Current Queue Size", executorQueue.getBackingQueue().size() )
- .add( "Largest Queue Size", getLargestQueueSize() )
- .add( "Max Queue Size", maximumQueueSize )
- .add( "Active Thread Count", getActiveCount() )
- .add( "Completed Task Count", getCompletedTaskCount() )
- .add( "Total Task Count", getTaskCount() ) ).toString();
+ return addToStringAttributes(MoreObjects.toStringHelper(this)
+ .add("Thread Prefix", threadPrefix)
+ .add("Current Thread Pool Size", getPoolSize())
+ .add("Largest Thread Pool Size", getLargestPoolSize())
+ .add("Max Thread Pool Size", getMaximumPoolSize())
+ .add("Current Queue Size", executorQueue.getBackingQueue().size())
+ .add("Largest Queue Size", getLargestQueueSize())
+ .add("Max Queue Size", maximumQueueSize)
+ .add("Active Thread Count", getActiveCount())
+ .add("Completed Task Count", getCompletedTaskCount())
+ .add("Total Task Count", getTaskCount())).toString();
}
/**
private final LinkedBlockingQueue<Runnable> backingQueue;
- ExecutorQueue( final int maxBackingQueueSize ) {
- backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
+ ExecutorQueue(final int maxBackingQueueSize) {
+ backingQueue = new TrackingLinkedBlockingQueue<>(maxBackingQueueSize);
}
LinkedBlockingQueue<Runnable> getBackingQueue() {
}
@Override
- public Runnable poll( final long timeout, final TimeUnit unit ) throws InterruptedException {
- long totalWaitTime = unit.toMillis( timeout );
- long waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
+ public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException {
+ long totalWaitTime = unit.toMillis(timeout);
+ long waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
Runnable task = null;
// We loop here, each time polling the backingQueue first then our queue, instead of
task = backingQueue.poll();
if (task == null) {
// No task in backing - call the base class to wait for one to be offered.
- task = super.poll( waitTime, TimeUnit.MILLISECONDS );
+ task = super.poll(waitTime, TimeUnit.MILLISECONDS);
totalWaitTime -= POLL_WAIT_TIME_IN_MS;
- if (totalWaitTime <= 0 ) {
+ if (totalWaitTime <= 0) {
break;
}
- waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
+ waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
}
}
private final LinkedBlockingQueue<Runnable> backingQueue;
private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
- RejectedTaskHandler( final LinkedBlockingQueue<Runnable> backingQueue,
- final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
+ RejectedTaskHandler(final LinkedBlockingQueue<Runnable> backingQueue,
+ final RejectedExecutionHandler delegateRejectedExecutionHandler) {
this.backingQueue = backingQueue;
this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
}
void setDelegateRejectedExecutionHandler(
- final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
+ final RejectedExecutionHandler delegateRejectedExecutionHandler) {
this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
}
}
@Override
- public void rejectedExecution( final Runnable task, final ThreadPoolExecutor executor ) {
+ public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
- throw new RejectedExecutionException( "Executor has been shutdown." );
+ throw new RejectedExecutionException("Executor has been shutdown.");
}
if (!backingQueue.offer(task)) {
- delegateRejectedExecutionHandler.rejectedExecution( task, executor );
+ delegateRejectedExecutionHandler.rejectedExecution(task, executor);
}
}
}