*/
package org.opendaylight.yangtools.util.concurrent;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Objects;
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.LoggerFactory;
/**
* A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
* constructed threads, when they are available, over creating new threads.
- * <p>
- * See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
+ *
+ * <p>See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
*
* @author Thomas Pantelis
*/
* the capacity of the queue.
* @param threadPrefix
* the name prefix for threads created by this executor.
+ * @param loggerIdentity
+ * the class to use as logger name for logging uncaught exceptions from the threads.
*/
- public CachedThreadPoolExecutor( int maximumPoolSize, int maximumQueueSize, String threadPrefix ) {
+ // due to loggerIdentity argument usage
+ @SuppressWarnings("checkstyle:LoggerFactoryClassParameter")
+ public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix,
+ final Class<?> loggerIdentity) {
// We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
// We don't specify any core threads (first parameter) so, when a task is submitted,
// the base class will always try to offer to the queue. If there is an existing waiting
// been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
// to offer to the backing queue. If that succeeds, the task will execute as soon as a
// thread becomes available. If the offer fails to the backing queue, the task is rejected.
- super( 0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
- new ExecutorQueue( maximumQueueSize ) );
+ super(0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
+ new ExecutorQueue(maximumQueueSize));
- this.threadPrefix = Preconditions.checkNotNull( threadPrefix );
+ this.threadPrefix = requireNonNull(threadPrefix);
this.maximumQueueSize = maximumQueueSize;
- setThreadFactory( new ThreadFactoryBuilder().setDaemon( true )
- .setNameFormat( this.threadPrefix + "-%d" ).build() );
+ setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix)
+ .logger(LoggerFactory.getLogger(loggerIdentity)).build().get());
executorQueue = (ExecutorQueue)super.getQueue();
rejectedTaskHandler = new RejectedTaskHandler(
- executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
- super.setRejectedExecutionHandler( rejectedTaskHandler );
+ executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy());
+ super.setRejectedExecutionHandler(rejectedTaskHandler);
}
@Override
- public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) {
- Preconditions.checkNotNull( handler );
- rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
+ public void setRejectedExecutionHandler(final RejectedExecutionHandler handler) {
+ rejectedTaskHandler.setDelegateRejectedExecutionHandler(requireNonNull(handler));
}
@Override
- public RejectedExecutionHandler getRejectedExecutionHandler(){
+ public RejectedExecutionHandler getRejectedExecutionHandler() {
return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
}
@Override
- public BlockingQueue<Runnable> getQueue(){
+ public BlockingQueue<Runnable> getQueue() {
return executorQueue.getBackingQueue();
}
public long getLargestQueueSize() {
- return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
+ return executorQueue.getBackingQueue().getLargestQueueSize();
}
- protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
+ protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
return toStringHelper;
}
@Override
public final String toString() {
- return addToStringAttributes( Objects.toStringHelper( this )
- .add( "Thread Prefix", threadPrefix )
- .add( "Current Thread Pool Size", getPoolSize() )
- .add( "Largest Thread Pool Size", getLargestPoolSize() )
- .add( "Max Thread Pool Size", getMaximumPoolSize() )
- .add( "Current Queue Size", executorQueue.getBackingQueue().size() )
- .add( "Largest Queue Size", getLargestQueueSize() )
- .add( "Max Queue Size", maximumQueueSize )
- .add( "Active Thread Count", getActiveCount() )
- .add( "Completed Task Count", getCompletedTaskCount() )
- .add( "Total Task Count", getTaskCount() ) ).toString();
+ return addToStringAttributes(MoreObjects.toStringHelper(this)
+ .add("Thread Prefix", threadPrefix)
+ .add("Current Thread Pool Size", getPoolSize())
+ .add("Largest Thread Pool Size", getLargestPoolSize())
+ .add("Max Thread Pool Size", getMaximumPoolSize())
+ .add("Current Queue Size", executorQueue.getBackingQueue().size())
+ .add("Largest Queue Size", getLargestQueueSize())
+ .add("Max Queue Size", maximumQueueSize)
+ .add("Active Thread Count", getActiveCount())
+ .add("Completed Task Count", getCompletedTaskCount())
+ .add("Total Task Count", getTaskCount())).toString();
}
/**
* threads are busy.
*/
private static class ExecutorQueue extends SynchronousQueue<Runnable> {
-
+ @java.io.Serial
private static final long serialVersionUID = 1L;
private static final long POLL_WAIT_TIME_IN_MS = 300;
- private final LinkedBlockingQueue<Runnable> backingQueue;
+ @SuppressFBWarnings("SE_BAD_FIELD")
+ // Runnable is not Serializable
+ private final TrackingLinkedBlockingQueue<Runnable> backingQueue;
- ExecutorQueue( int maxBackingQueueSize ) {
- backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
+ ExecutorQueue(final int maxBackingQueueSize) {
+ backingQueue = new TrackingLinkedBlockingQueue<>(maxBackingQueueSize);
}
- LinkedBlockingQueue<Runnable> getBackingQueue() {
+ TrackingLinkedBlockingQueue<Runnable> getBackingQueue() {
return backingQueue;
}
@Override
- public Runnable poll( long timeout, TimeUnit unit ) throws InterruptedException {
- long totalWaitTime = unit.toMillis( timeout );
- long waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
+ public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException {
+ long totalWaitTime = unit.toMillis(timeout);
+ long waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
Runnable task = null;
// We loop here, each time polling the backingQueue first then our queue, instead of
// periods, one thread will eventually wake up and get the task from the backingQueue
// and execute it, although slightly delayed.
- while( task == null ) {
+ while (task == null) {
// First try to get a task from the backing queue.
task = backingQueue.poll();
- if( task == null ) {
+ if (task == null) {
// No task in backing - call the base class to wait for one to be offered.
- task = super.poll( waitTime, TimeUnit.MILLISECONDS );
+ task = super.poll(waitTime, TimeUnit.MILLISECONDS);
totalWaitTime -= POLL_WAIT_TIME_IN_MS;
- if( totalWaitTime <= 0 ) {
+ if (totalWaitTime <= 0) {
break;
}
- waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
+ waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
}
}
private final LinkedBlockingQueue<Runnable> backingQueue;
private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
- RejectedTaskHandler( LinkedBlockingQueue<Runnable> backingQueue,
- RejectedExecutionHandler delegateRejectedExecutionHandler ) {
+ RejectedTaskHandler(final LinkedBlockingQueue<Runnable> backingQueue,
+ final RejectedExecutionHandler delegateRejectedExecutionHandler) {
this.backingQueue = backingQueue;
this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
}
void setDelegateRejectedExecutionHandler(
- RejectedExecutionHandler delegateRejectedExecutionHandler ) {
+ final RejectedExecutionHandler delegateRejectedExecutionHandler) {
this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
}
- RejectedExecutionHandler getDelegateRejectedExecutionHandler(){
+ RejectedExecutionHandler getDelegateRejectedExecutionHandler() {
return delegateRejectedExecutionHandler;
}
@Override
- public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
- if( executor.isShutdown() ) {
- throw new RejectedExecutionException( "Executor has been shutdown." );
+ public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) {
+ if (executor.isShutdown()) {
+ throw new RejectedExecutionException("Executor has been shutdown.");
}
- if( !backingQueue.offer( task ) ) {
- delegateRejectedExecutionHandler.rejectedExecution( task, executor );
+ if (!backingQueue.offer(task)) {
+ delegateRejectedExecutionHandler.rejectedExecution(task, executor);
}
}
}