import java.io.Closeable;
import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.config.threadpool.ThreadPool;
+import com.google.common.base.Optional;
+
/**
* Implementation of {@link ThreadPool} using flexible number of threads wraps
* {@link ExecutorService}.
public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
ThreadFactory threadFactory) {
+ this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.<Integer>absent()));
+ }
+
+ public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+ ThreadFactory threadFactory, Optional<Integer> queueCapacity) {
+ this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
+ }
+
+ private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
+ ThreadFactory threadFactory, BlockingQueue<Runnable> queue) {
executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
- new SynchronousQueue<Runnable>(), threadFactory);
+ queue, threadFactory, new FlexibleRejectionHandler());
executor.prestartAllCoreThreads();
}
+ /**
+ * Overriding the queue:
+ * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
+ * occurs in RejectedExecutionHandler.
+ * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
+ */
+ private static ForwardingBlockingQueue getQueue(Optional<Integer> capacity) {
+ final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<Runnable>(capacity.get()) : new LinkedBlockingQueue<Runnable>();
+ return new ForwardingBlockingQueue(delegate);
+ }
+
@Override
public ExecutorService getExecutor() {
return Executors.unconfigurableExecutorService(executor);
executor.shutdown();
}
+ /**
+ * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
+ */
+ private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
+ @Override
+ public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
+ }
+ }
+ }
+
+ private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
+ private final BlockingQueue<Runnable> delegate;
+
+ public ForwardingBlockingQueue(BlockingQueue<Runnable> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected BlockingQueue<Runnable> delegate() {
+ return delegate;
+ }
+
+ @Override
+ public boolean offer(final Runnable r) {
+ // ThreadPoolExecutor will spawn a new thread after core size is reached only
+ // if the queue.offer returns false.
+ return false;
+ }
+ }
}
*/
package org.opendaylight.controller.config.yang.threadpool.impl.flexible;
+import com.google.common.base.Optional;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.config.api.JmxAttributeValidationException;
JmxAttributeValidationException.checkNotNull(getMaxThreadCount(), maxThreadCountJmxAttribute);
JmxAttributeValidationException.checkCondition(getMaxThreadCount() > 0, "must be greater than zero",
maxThreadCountJmxAttribute);
+
+ if(getQueueCapacity() != null) {
+ JmxAttributeValidationException.checkCondition(getQueueCapacity() > 0, "Queue capacity cannot be < 1", queueCapacityJmxAttribute);
+ }
}
@Override
public java.lang.AutoCloseable createInstance() {
return new FlexibleThreadPoolWrapper(getMinThreadCount(), getMaxThreadCount(), getKeepAliveMillis(),
- TimeUnit.MILLISECONDS, getThreadFactoryDependency());
+ TimeUnit.MILLISECONDS, getThreadFactoryDependency(), Optional.fromNullable(getQueueCapacity()));
}
}
<data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
<modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
- <!-- Netconf dispatcher to be used by all netconf-connectors -->
+ <!-- Netconf dispatcher to be used by all netconf-connectors -->
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:config:netconf:client:dispatcher">prefix:netconf-client-dispatcher</type>
<name>global-netconf-dispatcher</name>
</timer>
</module>
- <!-- Thread factory to be used by all threadpools in netconf-connectors -->
+ <!-- Thread factory to be used by all threadpools in netconf-connectors -->
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">prefix:threadfactory-naming</type>
<name>global-netconf-processing-executor-threadfactory</name>
<name-prefix xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl">remote-connector-processing-executor</name-prefix>
- </module>
- <!-- Flexible threadpool for all netconf connectors, Max thread count is set to 4 -->
+ </module>
+ <!-- flexible threadpool for all netconf connectors, Max thread count is set to 4. -->
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">prefix:threadpool-flexible</type>
<name>global-netconf-processing-executor</name>
<minThreadCount xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">1</minThreadCount>
<max-thread-count xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">4</max-thread-count>
<keepAliveMillis xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">600000</keepAliveMillis>
+
<threadFactory xmlns="urn:opendaylight:params:xml:ns:yang:controller:threadpool:impl:flexible">
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:threadpool">prefix:threadfactory</type>
<name>global-netconf-processing-executor-threadfactory</name>
</threadFactory>
- </module>
+ </module>
</modules>
<services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">