Scripted update of if statements
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / CachedThreadPoolExecutor.java
index 4936efaba132fa150d60fa3c316975f6d81b6746..7f2e77f73141c5819bea990834a86008c7b5701e 100644 (file)
@@ -7,6 +7,10 @@
  */
 package org.opendaylight.yangtools.util.concurrent;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -14,12 +18,6 @@ import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Objects;
-import com.google.common.base.Objects.ToStringHelper;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
@@ -33,8 +31,6 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
 
     private static final long IDLE_TIMEOUT_IN_SEC = 60L;
 
-    private final AtomicLong largestBackingQueueSize = new AtomicLong( 0 );
-
     private final ExecutorQueue executorQueue;
 
     private final String threadPrefix;
@@ -54,7 +50,7 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
      * @param threadPrefix
      *            the name prefix for threads created by this executor.
      */
-    public CachedThreadPoolExecutor( int maximumPoolSize, int maximumQueueSize, String threadPrefix ) {
+    public CachedThreadPoolExecutor( final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix ) {
         // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
         // We don't specify any core threads (first parameter) so, when a task is submitted,
         // the base class will always try to offer to the queue. If there is an existing waiting
@@ -76,31 +72,37 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         executorQueue = (ExecutorQueue)super.getQueue();
 
         rejectedTaskHandler = new RejectedTaskHandler(
-                executorQueue.getBackingQueue(), largestBackingQueueSize );
+                executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
         super.setRejectedExecutionHandler( rejectedTaskHandler );
     }
 
     @Override
-    public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) {
+    public void setRejectedExecutionHandler( final RejectedExecutionHandler handler ) {
+        Preconditions.checkNotNull( handler );
         rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
     }
 
     @Override
-    public BlockingQueue<Runnable> getQueue(){
+    public RejectedExecutionHandler getRejectedExecutionHandler() {
+        return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
+    }
+
+    @Override
+    public BlockingQueue<Runnable> getQueue() {
         return executorQueue.getBackingQueue();
     }
 
     public long getLargestQueueSize() {
-        return largestBackingQueueSize.get();
+        return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
     }
 
-    protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
+    protected ToStringHelper addToStringAttributes( final ToStringHelper toStringHelper ) {
         return toStringHelper;
     }
 
     @Override
     public final String toString() {
-        return addToStringAttributes( Objects.toStringHelper( this )
+        return addToStringAttributes( MoreObjects.toStringHelper( this )
                 .add( "Thread Prefix", threadPrefix )
                 .add( "Current Thread Pool Size", getPoolSize() )
                 .add( "Largest Thread Pool Size", getLargestPoolSize() )
@@ -128,8 +130,8 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
 
         private final LinkedBlockingQueue<Runnable> backingQueue;
 
-        ExecutorQueue( int maxBackingQueueSize ) {
-            backingQueue = new LinkedBlockingQueue<>( maxBackingQueueSize );
+        ExecutorQueue( final int maxBackingQueueSize ) {
+            backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
         }
 
         LinkedBlockingQueue<Runnable> getBackingQueue() {
@@ -137,7 +139,7 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         }
 
         @Override
-        public Runnable poll( long timeout, TimeUnit unit ) throws InterruptedException {
+        public Runnable poll( final long timeout, final TimeUnit unit ) throws InterruptedException {
             long totalWaitTime = unit.toMillis( timeout );
             long waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
             Runnable task = null;
@@ -154,15 +156,15 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
             //   periods, one thread will eventually wake up and get the task from the backingQueue
             //   and execute it, although slightly delayed.
 
-            while( task == null ) {
+            while (task == null) {
                 // First try to get a task from the backing queue.
                 task = backingQueue.poll();
-                if( task == null ) {
+                if (task == null) {
                     // No task in backing - call the base class to wait for one to be offered.
                     task = super.poll( waitTime, TimeUnit.MILLISECONDS );
 
                     totalWaitTime -= POLL_WAIT_TIME_IN_MS;
-                    iftotalWaitTime <= 0 ) {
+                    if (totalWaitTime <= 0 ) {
                         break;
                     }
 
@@ -189,40 +191,31 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
     private static class RejectedTaskHandler implements RejectedExecutionHandler {
 
         private final LinkedBlockingQueue<Runnable> backingQueue;
-        private final AtomicLong largestBackingQueueSize;
         private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
 
-        RejectedTaskHandler( LinkedBlockingQueue<Runnable> backingQueue,
-                             AtomicLong largestBackingQueueSize ) {
+        RejectedTaskHandler( final LinkedBlockingQueue<Runnable> backingQueue,
+                             final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
             this.backingQueue = backingQueue;
-            this.largestBackingQueueSize = largestBackingQueueSize;
+            this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
         }
 
         void setDelegateRejectedExecutionHandler(
-                RejectedExecutionHandler delegateRejectedExecutionHandler ){
+                final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
         }
 
+        RejectedExecutionHandler getDelegateRejectedExecutionHandler(){
+            return delegateRejectedExecutionHandler;
+        }
+
         @Override
-        public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
-            if( executor.isShutdown() ) {
+        public void rejectedExecution( final Runnable task, final ThreadPoolExecutor executor ) {
+            if (executor.isShutdown()) {
                 throw new RejectedExecutionException( "Executor has been shutdown." );
             }
 
-            if( !backingQueue.offer( task ) ) {
-                if( delegateRejectedExecutionHandler != null ) {
-                    delegateRejectedExecutionHandler.rejectedExecution( task, executor );
-                } else {
-                    throw new RejectedExecutionException(
-                                                "All threads are in use and the queue is full" );
-                }
-            }
-
-            largestBackingQueueSize.incrementAndGet();
-            long size = backingQueue.size();
-            long largest = largestBackingQueueSize.get();
-            if( size > largest ) {
-                largestBackingQueueSize.compareAndSet( largest, size );
+            if (!backingQueue.offer(task)) {
+                delegateRejectedExecutionHandler.rejectedExecution( task, executor );
             }
         }
     }