Merge "Added tests for yang.model.util"
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / CachedThreadPoolExecutor.java
index 4936efaba132fa150d60fa3c316975f6d81b6746..a7dd4af009e06457925d305483c1bcb048455c62 100644 (file)
@@ -14,8 +14,6 @@ import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
 import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Preconditions;
@@ -33,8 +31,6 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
 
     private static final long IDLE_TIMEOUT_IN_SEC = 60L;
 
-    private final AtomicLong largestBackingQueueSize = new AtomicLong( 0 );
-
     private final ExecutorQueue executorQueue;
 
     private final String threadPrefix;
@@ -76,22 +72,28 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         executorQueue = (ExecutorQueue)super.getQueue();
 
         rejectedTaskHandler = new RejectedTaskHandler(
-                executorQueue.getBackingQueue(), largestBackingQueueSize );
+                executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
         super.setRejectedExecutionHandler( rejectedTaskHandler );
     }
 
     @Override
     public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) {
+        Preconditions.checkNotNull( handler );
         rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
     }
 
+    @Override
+    public RejectedExecutionHandler getRejectedExecutionHandler(){
+        return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
+    }
+
     @Override
     public BlockingQueue<Runnable> getQueue(){
         return executorQueue.getBackingQueue();
     }
 
     public long getLargestQueueSize() {
-        return largestBackingQueueSize.get();
+        return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
     }
 
     protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
@@ -129,7 +131,7 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         private final LinkedBlockingQueue<Runnable> backingQueue;
 
         ExecutorQueue( int maxBackingQueueSize ) {
-            backingQueue = new LinkedBlockingQueue<>( maxBackingQueueSize );
+            backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
         }
 
         LinkedBlockingQueue<Runnable> getBackingQueue() {
@@ -189,20 +191,23 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
     private static class RejectedTaskHandler implements RejectedExecutionHandler {
 
         private final LinkedBlockingQueue<Runnable> backingQueue;
-        private final AtomicLong largestBackingQueueSize;
         private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
 
         RejectedTaskHandler( LinkedBlockingQueue<Runnable> backingQueue,
-                             AtomicLong largestBackingQueueSize ) {
+                             RejectedExecutionHandler delegateRejectedExecutionHandler ) {
             this.backingQueue = backingQueue;
-            this.largestBackingQueueSize = largestBackingQueueSize;
+            this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
         }
 
         void setDelegateRejectedExecutionHandler(
-                RejectedExecutionHandler delegateRejectedExecutionHandler ){
+                RejectedExecutionHandler delegateRejectedExecutionHandler ) {
             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
         }
 
+        RejectedExecutionHandler getDelegateRejectedExecutionHandler(){
+            return delegateRejectedExecutionHandler;
+        }
+
         @Override
         public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
             if( executor.isShutdown() ) {
@@ -210,19 +215,7 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
             }
 
             if( !backingQueue.offer( task ) ) {
-                if( delegateRejectedExecutionHandler != null ) {
-                    delegateRejectedExecutionHandler.rejectedExecution( task, executor );
-                } else {
-                    throw new RejectedExecutionException(
-                                                "All threads are in use and the queue is full" );
-                }
-            }
-
-            largestBackingQueueSize.incrementAndGet();
-            long size = backingQueue.size();
-            long largest = largestBackingQueueSize.get();
-            if( size > largest ) {
-                largestBackingQueueSize.compareAndSet( largest, size );
+                delegateRejectedExecutionHandler.rejectedExecution( task, executor );
             }
         }
     }