Merge binding-model-{api,ri}
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / CachedThreadPoolExecutor.java
index a7dd4af009e06457925d305483c1bcb048455c62..3da7a55f82fc777a616c17a450adc17caf67c8e4 100644 (file)
@@ -7,6 +7,11 @@
  */
 package org.opendaylight.yangtools.util.concurrent;
 
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -14,16 +19,13 @@ import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import com.google.common.base.Objects;
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.LoggerFactory;
 
 /**
  * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
  * constructed threads, when they are available, over creating new threads.
- * <p>
- * See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
+ *
+ * <p>See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
  *
  * @author Thomas Pantelis
  */
@@ -49,8 +51,13 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
      *            the capacity of the queue.
      * @param threadPrefix
      *            the name prefix for threads created by this executor.
+     * @param loggerIdentity
+     *               the class to use as logger name for logging uncaught exceptions from the threads.
      */
-    public CachedThreadPoolExecutor( int maximumPoolSize, int maximumQueueSize, String threadPrefix ) {
+    // due to loggerIdentity argument usage
+    @SuppressWarnings("checkstyle:LoggerFactoryClassParameter")
+    public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix,
+            final Class<?> loggerIdentity) {
         // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
         // We don't specify any core threads (first parameter) so, when a task is submitted,
         // the base class will always try to offer to the queue. If there is an existing waiting
@@ -60,59 +67,58 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         // been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
         // to offer to the backing queue. If that succeeds, the task will execute as soon as a
         // thread becomes available. If the offer fails to the backing queue, the task is rejected.
-        super( 0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
-               new ExecutorQueue( maximumQueueSize ) );
+        super(0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
+               new ExecutorQueue(maximumQueueSize));
 
-        this.threadPrefix = Preconditions.checkNotNull( threadPrefix );
+        this.threadPrefix = requireNonNull(threadPrefix);
         this.maximumQueueSize = maximumQueueSize;
 
-        setThreadFactory( new ThreadFactoryBuilder().setDaemon( true )
-                                            .setNameFormat( this.threadPrefix + "-%d" ).build() );
+        setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix)
+                .logger(LoggerFactory.getLogger(loggerIdentity)).build().get());
 
         executorQueue = (ExecutorQueue)super.getQueue();
 
         rejectedTaskHandler = new RejectedTaskHandler(
-                executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
-        super.setRejectedExecutionHandler( rejectedTaskHandler );
+                executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy());
+        super.setRejectedExecutionHandler(rejectedTaskHandler);
     }
 
     @Override
-    public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) {
-        Preconditions.checkNotNull( handler );
-        rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
+    public void setRejectedExecutionHandler(final RejectedExecutionHandler handler) {
+        rejectedTaskHandler.setDelegateRejectedExecutionHandler(requireNonNull(handler));
     }
 
     @Override
-    public RejectedExecutionHandler getRejectedExecutionHandler(){
+    public RejectedExecutionHandler getRejectedExecutionHandler() {
         return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
     }
 
     @Override
-    public BlockingQueue<Runnable> getQueue(){
+    public BlockingQueue<Runnable> getQueue() {
         return executorQueue.getBackingQueue();
     }
 
     public long getLargestQueueSize() {
-        return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
+        return executorQueue.getBackingQueue().getLargestQueueSize();
     }
 
-    protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
+    protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
         return toStringHelper;
     }
 
     @Override
     public final String toString() {
-        return addToStringAttributes( Objects.toStringHelper( this )
-                .add( "Thread Prefix", threadPrefix )
-                .add( "Current Thread Pool Size", getPoolSize() )
-                .add( "Largest Thread Pool Size", getLargestPoolSize() )
-                .add( "Max Thread Pool Size", getMaximumPoolSize() )
-                .add( "Current Queue Size", executorQueue.getBackingQueue().size() )
-                .add( "Largest Queue Size", getLargestQueueSize() )
-                .add( "Max Queue Size", maximumQueueSize )
-                .add( "Active Thread Count", getActiveCount() )
-                .add( "Completed Task Count", getCompletedTaskCount() )
-                .add( "Total Task Count", getTaskCount() ) ).toString();
+        return addToStringAttributes(MoreObjects.toStringHelper(this)
+                .add("Thread Prefix", threadPrefix)
+                .add("Current Thread Pool Size", getPoolSize())
+                .add("Largest Thread Pool Size", getLargestPoolSize())
+                .add("Max Thread Pool Size", getMaximumPoolSize())
+                .add("Current Queue Size", executorQueue.getBackingQueue().size())
+                .add("Largest Queue Size", getLargestQueueSize())
+                .add("Max Queue Size", maximumQueueSize)
+                .add("Active Thread Count", getActiveCount())
+                .add("Completed Task Count", getCompletedTaskCount())
+                .add("Total Task Count", getTaskCount())).toString();
     }
 
     /**
@@ -123,25 +129,27 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
      * threads are busy.
      */
     private static class ExecutorQueue extends SynchronousQueue<Runnable> {
-
+        @java.io.Serial
         private static final long serialVersionUID = 1L;
 
         private static final long POLL_WAIT_TIME_IN_MS = 300;
 
-        private final LinkedBlockingQueue<Runnable> backingQueue;
+        @SuppressFBWarnings("SE_BAD_FIELD")
+        // Runnable is not Serializable
+        private final TrackingLinkedBlockingQueue<Runnable> backingQueue;
 
-        ExecutorQueue( int maxBackingQueueSize ) {
-            backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
+        ExecutorQueue(final int maxBackingQueueSize) {
+            backingQueue = new TrackingLinkedBlockingQueue<>(maxBackingQueueSize);
         }
 
-        LinkedBlockingQueue<Runnable> getBackingQueue() {
+        TrackingLinkedBlockingQueue<Runnable> getBackingQueue() {
             return backingQueue;
         }
 
         @Override
-        public Runnable poll( long timeout, TimeUnit unit ) throws InterruptedException {
-            long totalWaitTime = unit.toMillis( timeout );
-            long waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
+        public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException {
+            long totalWaitTime = unit.toMillis(timeout);
+            long waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
             Runnable task = null;
 
             // We loop here, each time polling the backingQueue first then our queue, instead of
@@ -156,19 +164,19 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
             //   periods, one thread will eventually wake up and get the task from the backingQueue
             //   and execute it, although slightly delayed.
 
-            while( task == null ) {
+            while (task == null) {
                 // First try to get a task from the backing queue.
                 task = backingQueue.poll();
-                if( task == null ) {
+                if (task == null) {
                     // No task in backing - call the base class to wait for one to be offered.
-                    task = super.poll( waitTime, TimeUnit.MILLISECONDS );
+                    task = super.poll(waitTime, TimeUnit.MILLISECONDS);
 
                     totalWaitTime -= POLL_WAIT_TIME_IN_MS;
-                    if( totalWaitTime <= 0 ) {
+                    if (totalWaitTime <= 0) {
                         break;
                     }
 
-                    waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
+                    waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
                 }
             }
 
@@ -193,29 +201,29 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         private final LinkedBlockingQueue<Runnable> backingQueue;
         private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
 
-        RejectedTaskHandler( LinkedBlockingQueue<Runnable> backingQueue,
-                             RejectedExecutionHandler delegateRejectedExecutionHandler ) {
+        RejectedTaskHandler(final LinkedBlockingQueue<Runnable> backingQueue,
+                             final RejectedExecutionHandler delegateRejectedExecutionHandler) {
             this.backingQueue = backingQueue;
             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
         }
 
         void setDelegateRejectedExecutionHandler(
-                RejectedExecutionHandler delegateRejectedExecutionHandler ) {
+                final RejectedExecutionHandler delegateRejectedExecutionHandler) {
             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
         }
 
-        RejectedExecutionHandler getDelegateRejectedExecutionHandler(){
+        RejectedExecutionHandler getDelegateRejectedExecutionHandler() {
             return delegateRejectedExecutionHandler;
         }
 
         @Override
-        public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
-            if( executor.isShutdown() ) {
-                throw new RejectedExecutionException( "Executor has been shutdown." );
+        public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) {
+            if (executor.isShutdown()) {
+                throw new RejectedExecutionException("Executor has been shutdown.");
             }
 
-            if( !backingQueue.offer( task ) ) {
-                delegateRejectedExecutionHandler.rejectedExecution( task, executor );
+            if (!backingQueue.offer(task)) {
+                delegateRejectedExecutionHandler.rejectedExecution(task, executor);
             }
         }
     }