* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.config.threadpool.util;
import java.io.Closeable;
-import java.io.IOException;
+import java.util.OptionalInt;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.opendaylight.controller.config.threadpool.ThreadPool;
/**
public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
private final ThreadPoolExecutor executor;
- public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
- ThreadFactory threadFactory) {
+ public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
+ final TimeUnit timeUnit, final ThreadFactory threadFactory) {
+ this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(OptionalInt.empty()));
+ }
+
+ public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
+ final TimeUnit timeUnit, final ThreadFactory threadFactory, final OptionalInt queueCapacity) {
+ this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
+ }
+
+ private FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
+ final TimeUnit timeUnit, final ThreadFactory threadFactory, final BlockingQueue<Runnable> queue) {
executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
- new SynchronousQueue<Runnable>(), threadFactory);
+ queue, threadFactory, new FlexibleRejectionHandler());
executor.prestartAllCoreThreads();
}
+ /**
+ * Overriding the queue:
+ * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
+ * occurs in RejectedExecutionHandler.
+ * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
+ */
+ private static ForwardingBlockingQueue getQueue(final OptionalInt capacity) {
+ final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<>(capacity.getAsInt())
+ : new LinkedBlockingQueue<>();
+ return new ForwardingBlockingQueue(delegate);
+ }
+
@Override
public ExecutorService getExecutor() {
return Executors.unconfigurableExecutorService(executor);
return executor.getCorePoolSize();
}
- public void setMinThreadCount(int minThreadCount) {
+ public void setMinThreadCount(final int minThreadCount) {
executor.setCorePoolSize(minThreadCount);
}
return executor.getMaximumPoolSize();
}
- public void setMaxThreadCount(int maxThreadCount) {
+ public void setMaxThreadCount(final int maxThreadCount) {
executor.setMaximumPoolSize(maxThreadCount);
}
return executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
}
- public void setKeepAliveMillis(long keepAliveMillis) {
+ public void setKeepAliveMillis(final long keepAliveMillis) {
executor.setKeepAliveTime(keepAliveMillis, TimeUnit.MILLISECONDS);
}
- public void setThreadFactory(ThreadFactory threadFactory) {
+ public void setThreadFactory(final ThreadFactory threadFactory) {
executor.setThreadFactory(threadFactory);
}
}
@Override
- public void close() throws IOException {
+ public void close() {
executor.shutdown();
}
+ /**
+ * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
+ */
+ private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
+ @Override
+ @SuppressWarnings("checkstyle:parameterName")
+ public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {
+ throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
+ }
+ }
+ }
+
+ private static class ForwardingBlockingQueue
+ extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
+ private final BlockingQueue<Runnable> delegate;
+
+ ForwardingBlockingQueue(final BlockingQueue<Runnable> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected BlockingQueue<Runnable> delegate() {
+ return delegate;
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:parameterName")
+ public boolean offer(final Runnable o) {
+ // ThreadPoolExecutor will spawn a new thread after core size is reached only
+ // if the queue.offer returns false.
+ return false;
+ }
+ }
}