Merge "Bug 1369: Use separate single threadpool for data operations on binding mountp...
[controller.git] / opendaylight / config / threadpool-config-impl / src / main / java / org / opendaylight / controller / config / threadpool / util / FlexibleThreadPoolWrapper.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.config.threadpool.util;
10
11 import java.io.Closeable;
12 import java.io.IOException;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.RejectedExecutionException;
18 import java.util.concurrent.RejectedExecutionHandler;
19 import java.util.concurrent.ThreadFactory;
20 import java.util.concurrent.ThreadPoolExecutor;
21 import java.util.concurrent.TimeUnit;
22
23 import org.opendaylight.controller.config.threadpool.ThreadPool;
24
25 import com.google.common.base.Optional;
26
27 /**
28  * Implementation of {@link ThreadPool} using flexible number of threads wraps
29  * {@link ExecutorService}.
30  */
31 public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
32     private final ThreadPoolExecutor executor;
33
34     public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
35             ThreadFactory threadFactory) {
36         this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.<Integer>absent()));
37     }
38
39     public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
40             ThreadFactory threadFactory, Optional<Integer> queueCapacity) {
41         this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
42     }
43
44     private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
45             ThreadFactory threadFactory, BlockingQueue<Runnable> queue) {
46
47         executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
48                 queue, threadFactory, new FlexibleRejectionHandler());
49         executor.prestartAllCoreThreads();
50     }
51
52     /**
53      * Overriding the queue:
54      * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
55      * occurs in RejectedExecutionHandler.
56      * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
57      */
58     private static ForwardingBlockingQueue getQueue(Optional<Integer> capacity) {
59         final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<Runnable>(capacity.get()) : new LinkedBlockingQueue<Runnable>();
60         return new ForwardingBlockingQueue(delegate);
61     }
62
63     @Override
64     public ExecutorService getExecutor() {
65         return Executors.unconfigurableExecutorService(executor);
66     }
67
68     public int getMinThreadCount() {
69         return executor.getCorePoolSize();
70     }
71
72     public void setMinThreadCount(int minThreadCount) {
73         executor.setCorePoolSize(minThreadCount);
74     }
75
76     @Override
77     public int getMaxThreadCount() {
78         return executor.getMaximumPoolSize();
79     }
80
81     public void setMaxThreadCount(int maxThreadCount) {
82         executor.setMaximumPoolSize(maxThreadCount);
83     }
84
85     public long getKeepAliveMillis() {
86         return executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
87     }
88
89     public void setKeepAliveMillis(long keepAliveMillis) {
90         executor.setKeepAliveTime(keepAliveMillis, TimeUnit.MILLISECONDS);
91     }
92
93     public void setThreadFactory(ThreadFactory threadFactory) {
94         executor.setThreadFactory(threadFactory);
95     }
96
97     public void prestartAllCoreThreads() {
98         executor.prestartAllCoreThreads();
99     }
100
101     @Override
102     public void close() throws IOException {
103         executor.shutdown();
104     }
105
106     /**
107      * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
108      */
109     private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
110         @Override
111         public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
112             try {
113                 executor.getQueue().put(r);
114             } catch (InterruptedException e) {
115                 throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
116             }
117         }
118     }
119
120     private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
121         private final BlockingQueue<Runnable> delegate;
122
123         public ForwardingBlockingQueue(BlockingQueue<Runnable> delegate) {
124             this.delegate = delegate;
125         }
126
127         @Override
128         protected BlockingQueue<Runnable> delegate() {
129             return delegate;
130         }
131
132         @Override
133         public boolean offer(final Runnable r) {
134             // ThreadPoolExecutor will spawn a new thread after core size is reached only
135             // if the queue.offer returns false.
136             return false;
137         }
138     }
139 }