From 4fb59b2820e205404695222df4578e5fce32c33f Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Wed, 16 Jul 2014 10:27:53 +0200 Subject: [PATCH] BUG-1291 Fix executor for netconf-connector rejecting tasks. Flexible threadpool rejected tasks with SynchronousQueue and did not spawn threads with LinkedBlockingQueue. Added custom Queue that makes the underlying threadpool to spawn threads as expected. Change-Id: I28d27316cf63b3370aa802bbd31ee976aa54fbf7 Signed-off-by: Maros Marsalek --- .../util/FlexibleThreadPoolWrapper.java | 63 ++++++++++++++++++- .../flexible/FlexibleThreadPoolModule.java | 7 ++- .../main/yang/threadpool-impl-flexible.yang | 6 ++ .../configuration/initial/01-netconf.xml | 11 ++-- 4 files changed, 79 insertions(+), 8 deletions(-) diff --git a/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java b/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java index 3dfa6e2bc7..5036399828 100644 --- a/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java +++ b/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/threadpool/util/FlexibleThreadPoolWrapper.java @@ -10,15 +10,20 @@ package org.opendaylight.controller.config.threadpool.util; import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.config.threadpool.ThreadPool; +import com.google.common.base.Optional; + /** * Implementation of {@link ThreadPool} using flexible number of threads wraps * {@link ExecutorService}. @@ -28,12 +33,33 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable { public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit, ThreadFactory threadFactory) { + this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.absent())); + } + + public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit, + ThreadFactory threadFactory, Optional queueCapacity) { + this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity)); + } + + private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit, + ThreadFactory threadFactory, BlockingQueue queue) { executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit, - new SynchronousQueue(), threadFactory); + queue, threadFactory, new FlexibleRejectionHandler()); executor.prestartAllCoreThreads(); } + /** + * Overriding the queue: + * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding + * occurs in RejectedExecutionHandler. + * This impl saturates threadpool first, then queue. When both are full caller will get blocked. + */ + private static ForwardingBlockingQueue getQueue(Optional capacity) { + final BlockingQueue delegate = capacity.isPresent() ? new LinkedBlockingQueue(capacity.get()) : new LinkedBlockingQueue(); + return new ForwardingBlockingQueue(delegate); + } + @Override public ExecutorService getExecutor() { return Executors.unconfigurableExecutorService(executor); @@ -77,4 +103,37 @@ public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable { executor.shutdown(); } + /** + * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue. + */ + private static class FlexibleRejectionHandler implements RejectedExecutionHandler { + @Override + public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) { + throw new RejectedExecutionException("Interrupted while waiting on the queue", e); + } + } + } + + private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue { + private final BlockingQueue delegate; + + public ForwardingBlockingQueue(BlockingQueue delegate) { + this.delegate = delegate; + } + + @Override + protected BlockingQueue delegate() { + return delegate; + } + + @Override + public boolean offer(final Runnable r) { + // ThreadPoolExecutor will spawn a new thread after core size is reached only + // if the queue.offer returns false. + return false; + } + } } diff --git a/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/yang/threadpool/impl/flexible/FlexibleThreadPoolModule.java b/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/yang/threadpool/impl/flexible/FlexibleThreadPoolModule.java index 94639d43c0..d6abe168fb 100644 --- a/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/yang/threadpool/impl/flexible/FlexibleThreadPoolModule.java +++ b/opendaylight/config/threadpool-config-impl/src/main/java/org/opendaylight/controller/config/yang/threadpool/impl/flexible/FlexibleThreadPoolModule.java @@ -17,6 +17,7 @@ */ package org.opendaylight.controller.config.yang.threadpool.impl.flexible; +import com.google.common.base.Optional; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.config.api.JmxAttributeValidationException; @@ -50,11 +51,15 @@ public final class FlexibleThreadPoolModule extends org.opendaylight.controller. JmxAttributeValidationException.checkNotNull(getMaxThreadCount(), maxThreadCountJmxAttribute); JmxAttributeValidationException.checkCondition(getMaxThreadCount() > 0, "must be greater than zero", maxThreadCountJmxAttribute); + + if(getQueueCapacity() != null) { + JmxAttributeValidationException.checkCondition(getQueueCapacity() > 0, "Queue capacity cannot be < 1", queueCapacityJmxAttribute); + } } @Override public java.lang.AutoCloseable createInstance() { return new FlexibleThreadPoolWrapper(getMinThreadCount(), getMaxThreadCount(), getKeepAliveMillis(), - TimeUnit.MILLISECONDS, getThreadFactoryDependency()); + TimeUnit.MILLISECONDS, getThreadFactoryDependency(), Optional.fromNullable(getQueueCapacity())); } } diff --git a/opendaylight/config/threadpool-config-impl/src/main/yang/threadpool-impl-flexible.yang b/opendaylight/config/threadpool-config-impl/src/main/yang/threadpool-impl-flexible.yang index be275ef487..c124f6388f 100644 --- a/opendaylight/config/threadpool-config-impl/src/main/yang/threadpool-impl-flexible.yang +++ b/opendaylight/config/threadpool-config-impl/src/main/yang/threadpool-impl-flexible.yang @@ -46,6 +46,12 @@ module threadpool-impl-flexible { type uint32; } + leaf queueCapacity { + type uint16; + mandatory false; + description "Capacity of queue that holds waiting tasks"; + } + container threadFactory { uses config:service-ref { refine type { diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-netconf.xml b/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-netconf.xml index 8fedbe4d4c..f81a332ab6 100644 --- a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-netconf.xml +++ b/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-netconf.xml @@ -12,7 +12,7 @@ - + prefix:netconf-client-dispatcher global-netconf-dispatcher @@ -30,24 +30,25 @@ - + prefix:threadfactory-naming global-netconf-processing-executor-threadfactory remote-connector-processing-executor - - + + prefix:threadpool-flexible global-netconf-processing-executor 1 4 600000 + prefix:threadfactory global-netconf-processing-executor-threadfactory - + -- 2.36.6