Use @Serial in util
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / CachedThreadPoolExecutor.java
index e7672ac4586351f81e8e977c649c2ed3d2a47df8..e3f1f40f4ce5a0c960285e1a73832d9c199df553 100644 (file)
@@ -7,10 +7,12 @@
  */
 package org.opendaylight.yangtools.util.concurrent;
 
+import static java.util.Objects.requireNonNull;
+
 import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.Serial;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -18,12 +20,13 @@ import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.slf4j.LoggerFactory;
 
 /**
  * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
  * constructed threads, when they are available, over creating new threads.
- * <p>
- * See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
+ *
+ * <p>See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
  *
  * @author Thomas Pantelis
  */
@@ -49,8 +52,13 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
      *            the capacity of the queue.
      * @param threadPrefix
      *            the name prefix for threads created by this executor.
+     * @param loggerIdentity
+     *               the class to use as logger name for logging uncaught exceptions from the threads.
      */
-    public CachedThreadPoolExecutor( final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix ) {
+    // due to loggerIdentity argument usage
+    @SuppressWarnings("checkstyle:LoggerFactoryClassParameter")
+    public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix,
+            final Class<?> loggerIdentity) {
         // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
         // We don't specify any core threads (first parameter) so, when a task is submitted,
         // the base class will always try to offer to the queue. If there is an existing waiting
@@ -60,26 +68,25 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         // been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
         // to offer to the backing queue. If that succeeds, the task will execute as soon as a
         // thread becomes available. If the offer fails to the backing queue, the task is rejected.
-        super( 0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
-               new ExecutorQueue( maximumQueueSize ) );
+        super(0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
+               new ExecutorQueue(maximumQueueSize));
 
-        this.threadPrefix = Preconditions.checkNotNull( threadPrefix );
+        this.threadPrefix = requireNonNull(threadPrefix);
         this.maximumQueueSize = maximumQueueSize;
 
-        setThreadFactory( new ThreadFactoryBuilder().setDaemon( true )
-                                            .setNameFormat( this.threadPrefix + "-%d" ).build() );
+        setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix)
+                .logger(LoggerFactory.getLogger(loggerIdentity)).build().get());
 
         executorQueue = (ExecutorQueue)super.getQueue();
 
         rejectedTaskHandler = new RejectedTaskHandler(
-                executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
-        super.setRejectedExecutionHandler( rejectedTaskHandler );
+                executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy());
+        super.setRejectedExecutionHandler(rejectedTaskHandler);
     }
 
     @Override
-    public void setRejectedExecutionHandler( final RejectedExecutionHandler handler ) {
-        Preconditions.checkNotNull( handler );
-        rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
+    public void setRejectedExecutionHandler(final RejectedExecutionHandler handler) {
+        rejectedTaskHandler.setDelegateRejectedExecutionHandler(requireNonNull(handler));
     }
 
     @Override
@@ -93,26 +100,26 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
     }
 
     public long getLargestQueueSize() {
-        return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
+        return executorQueue.getBackingQueue().getLargestQueueSize();
     }
 
-    protected ToStringHelper addToStringAttributes( final ToStringHelper toStringHelper ) {
+    protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
         return toStringHelper;
     }
 
     @Override
     public final String toString() {
-        return addToStringAttributes( MoreObjects.toStringHelper( this )
-                .add( "Thread Prefix", threadPrefix )
-                .add( "Current Thread Pool Size", getPoolSize() )
-                .add( "Largest Thread Pool Size", getLargestPoolSize() )
-                .add( "Max Thread Pool Size", getMaximumPoolSize() )
-                .add( "Current Queue Size", executorQueue.getBackingQueue().size() )
-                .add( "Largest Queue Size", getLargestQueueSize() )
-                .add( "Max Queue Size", maximumQueueSize )
-                .add( "Active Thread Count", getActiveCount() )
-                .add( "Completed Task Count", getCompletedTaskCount() )
-                .add( "Total Task Count", getTaskCount() ) ).toString();
+        return addToStringAttributes(MoreObjects.toStringHelper(this)
+                .add("Thread Prefix", threadPrefix)
+                .add("Current Thread Pool Size", getPoolSize())
+                .add("Largest Thread Pool Size", getLargestPoolSize())
+                .add("Max Thread Pool Size", getMaximumPoolSize())
+                .add("Current Queue Size", executorQueue.getBackingQueue().size())
+                .add("Largest Queue Size", getLargestQueueSize())
+                .add("Max Queue Size", maximumQueueSize)
+                .add("Active Thread Count", getActiveCount())
+                .add("Completed Task Count", getCompletedTaskCount())
+                .add("Total Task Count", getTaskCount())).toString();
     }
 
     /**
@@ -123,25 +130,27 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
      * threads are busy.
      */
     private static class ExecutorQueue extends SynchronousQueue<Runnable> {
-
+        @Serial
         private static final long serialVersionUID = 1L;
 
         private static final long POLL_WAIT_TIME_IN_MS = 300;
 
-        private final LinkedBlockingQueue<Runnable> backingQueue;
+        @SuppressFBWarnings("SE_BAD_FIELD")
+        // Runnable is not Serializable
+        private final TrackingLinkedBlockingQueue<Runnable> backingQueue;
 
-        ExecutorQueue( final int maxBackingQueueSize ) {
-            backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
+        ExecutorQueue(final int maxBackingQueueSize) {
+            backingQueue = new TrackingLinkedBlockingQueue<>(maxBackingQueueSize);
         }
 
-        LinkedBlockingQueue<Runnable> getBackingQueue() {
+        TrackingLinkedBlockingQueue<Runnable> getBackingQueue() {
             return backingQueue;
         }
 
         @Override
-        public Runnable poll( final long timeout, final TimeUnit unit ) throws InterruptedException {
-            long totalWaitTime = unit.toMillis( timeout );
-            long waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
+        public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException {
+            long totalWaitTime = unit.toMillis(timeout);
+            long waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
             Runnable task = null;
 
             // We loop here, each time polling the backingQueue first then our queue, instead of
@@ -161,14 +170,14 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
                 task = backingQueue.poll();
                 if (task == null) {
                     // No task in backing - call the base class to wait for one to be offered.
-                    task = super.poll( waitTime, TimeUnit.MILLISECONDS );
+                    task = super.poll(waitTime, TimeUnit.MILLISECONDS);
 
                     totalWaitTime -= POLL_WAIT_TIME_IN_MS;
-                    if( totalWaitTime <= 0 ) {
+                    if (totalWaitTime <= 0) {
                         break;
                     }
 
-                    waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
+                    waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
                 }
             }
 
@@ -193,29 +202,29 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         private final LinkedBlockingQueue<Runnable> backingQueue;
         private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
 
-        RejectedTaskHandler( final LinkedBlockingQueue<Runnable> backingQueue,
-                             final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
+        RejectedTaskHandler(final LinkedBlockingQueue<Runnable> backingQueue,
+                             final RejectedExecutionHandler delegateRejectedExecutionHandler) {
             this.backingQueue = backingQueue;
             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
         }
 
         void setDelegateRejectedExecutionHandler(
-                final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
+                final RejectedExecutionHandler delegateRejectedExecutionHandler) {
             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
         }
 
-        RejectedExecutionHandler getDelegateRejectedExecutionHandler(){
+        RejectedExecutionHandler getDelegateRejectedExecutionHandler() {
             return delegateRejectedExecutionHandler;
         }
 
         @Override
-        public void rejectedExecution( final Runnable task, final ThreadPoolExecutor executor ) {
+        public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) {
             if (executor.isShutdown()) {
-                throw new RejectedExecutionException( "Executor has been shutdown." );
+                throw new RejectedExecutionException("Executor has been shutdown.");
             }
 
             if (!backingQueue.offer(task)) {
-                delegateRejectedExecutionHandler.rejectedExecution( task, executor );
+                delegateRejectedExecutionHandler.rejectedExecution(task, executor);
             }
         }
     }