package org.opendaylight.yangtools.util.concurrent;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Objects;
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
/**
* A ThreadPoolExecutor with a specified bounded queue capacity that favors creating new threads
* over queuing, as the former is faster.
* <p>
- * See {@link SpecialExecutors#newFastBlockingThreadPool} for more details.
+ * See {@link SpecialExecutors#newBoundedFastThreadPool} for more details.
*
* @author Thomas Pantelis
*/
* @param threadPrefix
* the name prefix for threads created by this executor.
*/
- public FastThreadPoolExecutor( int maximumPoolSize, int maximumQueueSize, String threadPrefix ) {
+ public FastThreadPoolExecutor( final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix ) {
this( maximumPoolSize, maximumQueueSize, DEFAULT_IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
threadPrefix );
}
* @param threadPrefix
* the name prefix for threads created by this executor.
*/
- public FastThreadPoolExecutor( int maximumPoolSize, int maximumQueueSize, long keepAliveTime,
- TimeUnit unit, String threadPrefix ) {
+ public FastThreadPoolExecutor( final int maximumPoolSize, final int maximumQueueSize, final long keepAliveTime,
+ final TimeUnit unit, final String threadPrefix ) {
// We use all core threads (the first 2 parameters below equal) so, when a task is submitted,
// if the thread limit hasn't been reached, a new thread will be spawned to execute
// the task even if there is an existing idle thread in the pool. This is faster than
// reached, subsequent tasks will be queued. If the queue is full, tasks will be rejected.
super( maximumPoolSize, maximumPoolSize, keepAliveTime, unit,
- new LinkedBlockingQueue<Runnable>( maximumQueueSize ) );
+ new TrackingLinkedBlockingQueue<Runnable>( maximumQueueSize ) );
this.threadPrefix = threadPrefix;
this.maximumQueueSize = maximumQueueSize;
setThreadFactory( new ThreadFactoryBuilder().setDaemon( true )
.setNameFormat( threadPrefix + "-%d" ).build() );
- if( keepAliveTime > 0 ) {
+ if (keepAliveTime > 0) {
// Need to specifically configure core threads to timeout.
allowCoreThreadTimeOut( true );
}
+
+ setRejectedExecutionHandler( CountingRejectedExecutionHandler.newAbortPolicy() );
+ }
+
+ public long getLargestQueueSize() {
+ return ((TrackingLinkedBlockingQueue<?>)getQueue()).getLargestQueueSize();
}
- protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
+ protected ToStringHelper addToStringAttributes( final ToStringHelper toStringHelper ) {
return toStringHelper;
}
@Override
public final String toString() {
- return addToStringAttributes( Objects.toStringHelper( this )
+ return addToStringAttributes( MoreObjects.toStringHelper( this )
.add( "Thread Prefix", threadPrefix )
.add( "Current Thread Pool Size", getPoolSize() )
.add( "Largest Thread Pool Size", getLargestPoolSize() )
.add( "Max Thread Pool Size", getMaximumPoolSize() )
.add( "Current Queue Size", getQueue().size() )
+ .add( "Largest Queue Size", getLargestQueueSize() )
.add( "Max Queue Size", maximumQueueSize )
.add( "Active Thread Count", getActiveCount() )
.add( "Completed Task Count", getCompletedTaskCount() )