2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.config.threadpool.util;
11 import com.google.common.base.Optional;
12 import java.io.Closeable;
13 import java.io.IOException;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.RejectedExecutionException;
19 import java.util.concurrent.RejectedExecutionHandler;
20 import java.util.concurrent.ThreadFactory;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.config.threadpool.ThreadPool;
26 * Implementation of {@link ThreadPool} using flexible number of threads wraps
27 * {@link ExecutorService}.
29 public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
30 private final ThreadPoolExecutor executor;
32 public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
33 ThreadFactory threadFactory) {
34 this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.<Integer>absent()));
37 public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
38 ThreadFactory threadFactory, Optional<Integer> queueCapacity) {
39 this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
42 private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
43 ThreadFactory threadFactory, BlockingQueue<Runnable> queue) {
45 executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
46 queue, threadFactory, new FlexibleRejectionHandler());
47 executor.prestartAllCoreThreads();
51 * Overriding the queue:
52 * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
53 * occurs in RejectedExecutionHandler.
54 * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
56 private static ForwardingBlockingQueue getQueue(Optional<Integer> capacity) {
57 final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<Runnable>(capacity.get()) : new LinkedBlockingQueue<Runnable>();
58 return new ForwardingBlockingQueue(delegate);
62 public ExecutorService getExecutor() {
63 return Executors.unconfigurableExecutorService(executor);
66 public int getMinThreadCount() {
67 return executor.getCorePoolSize();
70 public void setMinThreadCount(int minThreadCount) {
71 executor.setCorePoolSize(minThreadCount);
75 public int getMaxThreadCount() {
76 return executor.getMaximumPoolSize();
79 public void setMaxThreadCount(int maxThreadCount) {
80 executor.setMaximumPoolSize(maxThreadCount);
83 public long getKeepAliveMillis() {
84 return executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
87 public void setKeepAliveMillis(long keepAliveMillis) {
88 executor.setKeepAliveTime(keepAliveMillis, TimeUnit.MILLISECONDS);
91 public void setThreadFactory(ThreadFactory threadFactory) {
92 executor.setThreadFactory(threadFactory);
95 public void prestartAllCoreThreads() {
96 executor.prestartAllCoreThreads();
100 public void close() throws IOException {
105 * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
107 private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
109 public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
111 executor.getQueue().put(r);
112 } catch (InterruptedException e) {
113 throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
118 private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
119 private final BlockingQueue<Runnable> delegate;
121 public ForwardingBlockingQueue(BlockingQueue<Runnable> delegate) {
122 this.delegate = delegate;
126 protected BlockingQueue<Runnable> delegate() {
131 public boolean offer(final Runnable r) {
132 // ThreadPoolExecutor will spawn a new thread after core size is reached only
133 // if the queue.offer returns false.