b89657a3bd13a895bd929c61c766211b211c7407
[controller.git] / opendaylight / config / threadpool-config-impl / src / main / java / org / opendaylight / controller / config / threadpool / util / FlexibleThreadPoolWrapper.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.config.threadpool.util;
10
11 import com.google.common.base.Optional;
12 import java.io.Closeable;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.RejectedExecutionException;
18 import java.util.concurrent.RejectedExecutionHandler;
19 import java.util.concurrent.ThreadFactory;
20 import java.util.concurrent.ThreadPoolExecutor;
21 import java.util.concurrent.TimeUnit;
22 import org.opendaylight.controller.config.threadpool.ThreadPool;
23
24 /**
25  * Implementation of {@link ThreadPool} using flexible number of threads wraps
26  * {@link ExecutorService}.
27  */
28 public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
29     private final ThreadPoolExecutor executor;
30
31     public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
32             ThreadFactory threadFactory) {
33         this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.<Integer>absent()));
34     }
35
36     public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
37             ThreadFactory threadFactory, Optional<Integer> queueCapacity) {
38         this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
39     }
40
41     private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
42             ThreadFactory threadFactory, BlockingQueue<Runnable> queue) {
43
44         executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
45                 queue, threadFactory, new FlexibleRejectionHandler());
46         executor.prestartAllCoreThreads();
47     }
48
49     /**
50      * Overriding the queue:
51      * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
52      * occurs in RejectedExecutionHandler.
53      * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
54      */
55     private static ForwardingBlockingQueue getQueue(Optional<Integer> capacity) {
56         final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<>(capacity.get()) : new LinkedBlockingQueue<>();
57         return new ForwardingBlockingQueue(delegate);
58     }
59
60     @Override
61     public ExecutorService getExecutor() {
62         return Executors.unconfigurableExecutorService(executor);
63     }
64
65     public int getMinThreadCount() {
66         return executor.getCorePoolSize();
67     }
68
69     public void setMinThreadCount(int minThreadCount) {
70         executor.setCorePoolSize(minThreadCount);
71     }
72
73     @Override
74     public int getMaxThreadCount() {
75         return executor.getMaximumPoolSize();
76     }
77
78     public void setMaxThreadCount(int maxThreadCount) {
79         executor.setMaximumPoolSize(maxThreadCount);
80     }
81
82     public long getKeepAliveMillis() {
83         return executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
84     }
85
86     public void setKeepAliveMillis(long keepAliveMillis) {
87         executor.setKeepAliveTime(keepAliveMillis, TimeUnit.MILLISECONDS);
88     }
89
90     public void setThreadFactory(ThreadFactory threadFactory) {
91         executor.setThreadFactory(threadFactory);
92     }
93
94     public void prestartAllCoreThreads() {
95         executor.prestartAllCoreThreads();
96     }
97
98     @Override
99     public void close() {
100         executor.shutdown();
101     }
102
103     /**
104      * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
105      */
106     private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
107         @Override
108         public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
109             try {
110                 executor.getQueue().put(r);
111             } catch (InterruptedException e) {
112                 throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
113             }
114         }
115     }
116
117     private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
118         private final BlockingQueue<Runnable> delegate;
119
120         public ForwardingBlockingQueue(BlockingQueue<Runnable> delegate) {
121             this.delegate = delegate;
122         }
123
124         @Override
125         protected BlockingQueue<Runnable> delegate() {
126             return delegate;
127         }
128
129         @Override
130         public boolean offer(final Runnable r) {
131             // ThreadPoolExecutor will spawn a new thread after core size is reached only
132             // if the queue.offer returns false.
133             return false;
134         }
135     }
136 }