X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fconfig%2Fthreadpool-config-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconfig%2Fthreadpool%2Futil%2FFlexibleThreadPoolWrapper.java;h=9949e36d3714f55b6936e66d3d596ef9a5b9b96c;hb=refs%2Fchanges%2F02%2F83802%2F42;hp=3dfa6e2bc756419b18f3224b32b127255e00d35d;hpb=caee336f062eba4909ba53cbaccdde0714236134;p=controller.git diff --git a/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java b/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java index 3dfa6e2bc7..9949e36d37 100644 --- a/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java +++ b/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java @@ -5,18 +5,19 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.config.threadpool.util; import java.io.Closeable; -import java.io.IOException; +import java.util.OptionalInt; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - import org.opendaylight.controller.config.threadpool.ThreadPool; /** @@ -26,14 +27,36 @@ import org.opendaylight.controller.config.threadpool.ThreadPool; public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable { private final ThreadPoolExecutor executor; - public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit, - ThreadFactory threadFactory) { + public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive, + final TimeUnit timeUnit, final ThreadFactory threadFactory) { + this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(OptionalInt.empty())); + } + + public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive, + final TimeUnit timeUnit, final ThreadFactory threadFactory, final OptionalInt queueCapacity) { + this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity)); + } + + private FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive, + final TimeUnit timeUnit, final ThreadFactory threadFactory, final BlockingQueue queue) { executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit, - new SynchronousQueue(), threadFactory); + queue, threadFactory, new FlexibleRejectionHandler()); executor.prestartAllCoreThreads(); } + /** + * Overriding the queue: + * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding + * occurs in RejectedExecutionHandler. + * This impl saturates threadpool first, then queue. When both are full caller will get blocked. + */ + private static ForwardingBlockingQueue getQueue(final OptionalInt capacity) { + final BlockingQueue delegate = capacity.isPresent() ? new LinkedBlockingQueue<>(capacity.getAsInt()) + : new LinkedBlockingQueue<>(); + return new ForwardingBlockingQueue(delegate); + } + @Override public ExecutorService getExecutor() { return Executors.unconfigurableExecutorService(executor); @@ -43,7 +66,7 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable { return executor.getCorePoolSize(); } - public void setMinThreadCount(int minThreadCount) { + public void setMinThreadCount(final int minThreadCount) { executor.setCorePoolSize(minThreadCount); } @@ -52,7 +75,7 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable { return executor.getMaximumPoolSize(); } - public void setMaxThreadCount(int maxThreadCount) { + public void setMaxThreadCount(final int maxThreadCount) { executor.setMaximumPoolSize(maxThreadCount); } @@ -60,11 +83,11 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable { return executor.getKeepAliveTime(TimeUnit.MILLISECONDS); } - public void setKeepAliveMillis(long keepAliveMillis) { + public void setKeepAliveMillis(final long keepAliveMillis) { executor.setKeepAliveTime(keepAliveMillis, TimeUnit.MILLISECONDS); } - public void setThreadFactory(ThreadFactory threadFactory) { + public void setThreadFactory(final ThreadFactory threadFactory) { executor.setThreadFactory(threadFactory); } @@ -73,8 +96,44 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable { } @Override - public void close() throws IOException { + public void close() { executor.shutdown(); } + /** + * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue. + */ + private static class FlexibleRejectionHandler implements RejectedExecutionHandler { + @Override + @SuppressWarnings("checkstyle:parameterName") + public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException("Interrupted while waiting on the queue", e); + } + } + } + + private static class ForwardingBlockingQueue + extends com.google.common.util.concurrent.ForwardingBlockingQueue { + private final BlockingQueue delegate; + + ForwardingBlockingQueue(final BlockingQueue delegate) { + this.delegate = delegate; + } + + @Override + protected BlockingQueue delegate() { + return delegate; + } + + @Override + @SuppressWarnings("checkstyle:parameterName") + public boolean offer(final Runnable o) { + // ThreadPoolExecutor will spawn a new thread after core size is reached only + // if the queue.offer returns false. + return false; + } + } }