Use OptionalInt in FlexibleThreadPoolWrapper
[controller.git] / opendaylight / config / threadpool-config-impl / src / main / java / org / opendaylight / controller / config / threadpool / util / FlexibleThreadPoolWrapper.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.config.threadpool.util;
9
10 import java.io.Closeable;
11 import java.util.OptionalInt;
12 import java.util.concurrent.BlockingQueue;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.RejectedExecutionException;
17 import java.util.concurrent.RejectedExecutionHandler;
18 import java.util.concurrent.ThreadFactory;
19 import java.util.concurrent.ThreadPoolExecutor;
20 import java.util.concurrent.TimeUnit;
21 import org.opendaylight.controller.config.threadpool.ThreadPool;
22
23 /**
24  * Implementation of {@link ThreadPool} using flexible number of threads wraps
25  * {@link ExecutorService}.
26  */
27 public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
28     private final ThreadPoolExecutor executor;
29
30     public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
31             final TimeUnit timeUnit, final ThreadFactory threadFactory) {
32         this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(OptionalInt.empty()));
33     }
34
35     public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
36             final TimeUnit timeUnit, final ThreadFactory threadFactory, final OptionalInt queueCapacity) {
37         this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
38     }
39
40     private FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
41             final TimeUnit timeUnit, final ThreadFactory threadFactory, final BlockingQueue<Runnable> queue) {
42
43         executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
44                 queue, threadFactory, new FlexibleRejectionHandler());
45         executor.prestartAllCoreThreads();
46     }
47
48     /**
49      * Overriding the queue:
50      * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
51      * occurs in RejectedExecutionHandler.
52      * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
53      */
54     private static ForwardingBlockingQueue getQueue(final OptionalInt capacity) {
55         final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<>(capacity.getAsInt())
56                 : new LinkedBlockingQueue<>();
57         return new ForwardingBlockingQueue(delegate);
58     }
59
60     @Override
61     public ExecutorService getExecutor() {
62         return Executors.unconfigurableExecutorService(executor);
63     }
64
65     public int getMinThreadCount() {
66         return executor.getCorePoolSize();
67     }
68
69     public void setMinThreadCount(final int minThreadCount) {
70         executor.setCorePoolSize(minThreadCount);
71     }
72
73     @Override
74     public int getMaxThreadCount() {
75         return executor.getMaximumPoolSize();
76     }
77
78     public void setMaxThreadCount(final int maxThreadCount) {
79         executor.setMaximumPoolSize(maxThreadCount);
80     }
81
82     public long getKeepAliveMillis() {
83         return executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
84     }
85
86     public void setKeepAliveMillis(final long keepAliveMillis) {
87         executor.setKeepAliveTime(keepAliveMillis, TimeUnit.MILLISECONDS);
88     }
89
90     public void setThreadFactory(final ThreadFactory threadFactory) {
91         executor.setThreadFactory(threadFactory);
92     }
93
94     public void prestartAllCoreThreads() {
95         executor.prestartAllCoreThreads();
96     }
97
98     @Override
99     public void close() {
100         executor.shutdown();
101     }
102
103     /**
104      * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
105      */
106     private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
107         @Override
108         public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
109             try {
110                 executor.getQueue().put(r);
111             } catch (InterruptedException e) {
112                 throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
113             }
114         }
115     }
116
117     private static class ForwardingBlockingQueue
118             extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
119         private final BlockingQueue<Runnable> delegate;
120
121         public ForwardingBlockingQueue(final BlockingQueue<Runnable> delegate) {
122             this.delegate = delegate;
123         }
124
125         @Override
126         protected BlockingQueue<Runnable> delegate() {
127             return delegate;
128         }
129
130         @Override
131         public boolean offer(final Runnable r) {
132             // ThreadPoolExecutor will spawn a new thread after core size is reached only
133             // if the queue.offer returns false.
134             return false;
135         }
136     }
137 }