X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fconfig%2Fthreadpool-config-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fconfig%2Fthreadpool%2Futil%2FFlexibleThreadPoolWrapper.java;h=5036399828a539e5ec5deb451881f3e4e9218b68;hp=3dfa6e2bc756419b18f3224b32b127255e00d35d;hb=17d82f582a6bc13c78be3b19954ff8c021180e93;hpb=caee336f062eba4909ba53cbaccdde0714236134 diff --git a/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java b/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java index 3dfa6e2bc7..5036399828 100644 --- a/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java +++ b/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java @@ -10,15 +10,20 @@ package org.opendaylight.controller.config.threadpool.util; import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.config.threadpool.ThreadPool; +import com.google.common.base.Optional; + /** * Implementation of {@link ThreadPool} using flexible number of threads wraps * {@link ExecutorService}. @@ -28,12 +33,33 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable { public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit, ThreadFactory threadFactory) { + this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.absent())); + } + + public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit, + ThreadFactory threadFactory, Optional queueCapacity) { + this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity)); + } + + private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit, + ThreadFactory threadFactory, BlockingQueue queue) { executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit, - new SynchronousQueue(), threadFactory); + queue, threadFactory, new FlexibleRejectionHandler()); executor.prestartAllCoreThreads(); } + /** + * Overriding the queue: + * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding + * occurs in RejectedExecutionHandler. + * This impl saturates threadpool first, then queue. When both are full caller will get blocked. + */ + private static ForwardingBlockingQueue getQueue(Optional capacity) { + final BlockingQueue delegate = capacity.isPresent() ? new LinkedBlockingQueue(capacity.get()) : new LinkedBlockingQueue(); + return new ForwardingBlockingQueue(delegate); + } + @Override public ExecutorService getExecutor() { return Executors.unconfigurableExecutorService(executor); @@ -77,4 +103,37 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable { executor.shutdown(); } + /** + * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue. + */ + private static class FlexibleRejectionHandler implements RejectedExecutionHandler { + @Override + public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException("Interrupted while waiting on the queue", e); + } + } + } + + private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue { + private final BlockingQueue delegate; + + public ForwardingBlockingQueue(BlockingQueue delegate) { + this.delegate = delegate; + } + + @Override + protected BlockingQueue delegate() { + return delegate; + } + + @Override + public boolean offer(final Runnable r) { + // ThreadPoolExecutor will spawn a new thread after core size is reached only + // if the queue.offer returns false. + return false; + } + } }