Bump odlparent to 6.0.0
[controller.git] / opendaylight / config / threadpool-config-impl / src / main / java / org / opendaylight / controller / config / threadpool / util / FlexibleThreadPoolWrapper.java
index 3dfa6e2bc756419b18f3224b32b127255e00d35d..9949e36d3714f55b6936e66d3d596ef9a5b9b96c 100644 (file)
@@ -5,18 +5,19 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.config.threadpool.util;
 
 import java.io.Closeable;
-import java.io.IOException;
+import java.util.OptionalInt;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.opendaylight.controller.config.threadpool.ThreadPool;
 
 /**
@@ -26,14 +27,36 @@ import org.opendaylight.controller.config.threadpool.ThreadPool;
 public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
     private final ThreadPoolExecutor executor;
 
-    public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
-            ThreadFactory threadFactory) {
+    public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
+            final TimeUnit timeUnit, final ThreadFactory threadFactory) {
+        this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(OptionalInt.empty()));
+    }
+
+    public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
+            final TimeUnit timeUnit, final ThreadFactory threadFactory, final OptionalInt queueCapacity) {
+        this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
+    }
+
+    private FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
+            final TimeUnit timeUnit, final ThreadFactory threadFactory, final BlockingQueue<Runnable> queue) {
 
         executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
-                new SynchronousQueue<Runnable>(), threadFactory);
+                queue, threadFactory, new FlexibleRejectionHandler());
         executor.prestartAllCoreThreads();
     }
 
+    /**
+     * Overriding the queue:
+     * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
+     * occurs in RejectedExecutionHandler.
+     * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
+     */
+    private static ForwardingBlockingQueue getQueue(final OptionalInt capacity) {
+        final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<>(capacity.getAsInt())
+                : new LinkedBlockingQueue<>();
+        return new ForwardingBlockingQueue(delegate);
+    }
+
     @Override
     public ExecutorService getExecutor() {
         return Executors.unconfigurableExecutorService(executor);
@@ -43,7 +66,7 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
         return executor.getCorePoolSize();
     }
 
-    public void setMinThreadCount(int minThreadCount) {
+    public void setMinThreadCount(final int minThreadCount) {
         executor.setCorePoolSize(minThreadCount);
     }
 
@@ -52,7 +75,7 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
         return executor.getMaximumPoolSize();
     }
 
-    public void setMaxThreadCount(int maxThreadCount) {
+    public void setMaxThreadCount(final int maxThreadCount) {
         executor.setMaximumPoolSize(maxThreadCount);
     }
 
@@ -60,11 +83,11 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
         return executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
     }
 
-    public void setKeepAliveMillis(long keepAliveMillis) {
+    public void setKeepAliveMillis(final long keepAliveMillis) {
         executor.setKeepAliveTime(keepAliveMillis, TimeUnit.MILLISECONDS);
     }
 
-    public void setThreadFactory(ThreadFactory threadFactory) {
+    public void setThreadFactory(final ThreadFactory threadFactory) {
         executor.setThreadFactory(threadFactory);
     }
 
@@ -73,8 +96,44 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
         executor.shutdown();
     }
 
+    /**
+     * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
+     */
+    private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
+        @Override
+        @SuppressWarnings("checkstyle:parameterName")
+        public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
+            try {
+                executor.getQueue().put(r);
+            } catch (InterruptedException e) {
+                throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
+            }
+        }
+    }
+
+    private static class ForwardingBlockingQueue
+            extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
+        private final BlockingQueue<Runnable> delegate;
+
+        ForwardingBlockingQueue(final BlockingQueue<Runnable> delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        protected BlockingQueue<Runnable> delegate() {
+            return delegate;
+        }
+
+        @Override
+        @SuppressWarnings("checkstyle:parameterName")
+        public boolean offer(final Runnable o) {
+            // ThreadPoolExecutor will spawn a new thread after core size is reached only
+            // if the queue.offer returns false.
+            return false;
+        }
+    }
 }