Mechanical code cleanup (config)
[controller.git] / opendaylight / config / threadpool-config-impl / src / main / java / org / opendaylight / controller / config / threadpool / util / FlexibleThreadPoolWrapper.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.config.threadpool.util;
10
11 import com.google.common.base.Optional;
12 import java.io.Closeable;
13 import java.io.IOException;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.RejectedExecutionException;
19 import java.util.concurrent.RejectedExecutionHandler;
20 import java.util.concurrent.ThreadFactory;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.config.threadpool.ThreadPool;
24
25 /**
26  * Implementation of {@link ThreadPool} using flexible number of threads wraps
27  * {@link ExecutorService}.
28  */
29 public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
30     private final ThreadPoolExecutor executor;
31
32     public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
33             ThreadFactory threadFactory) {
34         this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(Optional.<Integer>absent()));
35     }
36
37     public FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
38             ThreadFactory threadFactory, Optional<Integer> queueCapacity) {
39         this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
40     }
41
42     private FlexibleThreadPoolWrapper(int minThreadCount, int maxThreadCount, long keepAlive, TimeUnit timeUnit,
43             ThreadFactory threadFactory, BlockingQueue<Runnable> queue) {
44
45         executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
46                 queue, threadFactory, new FlexibleRejectionHandler());
47         executor.prestartAllCoreThreads();
48     }
49
50     /**
51      * Overriding the queue:
52      * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
53      * occurs in RejectedExecutionHandler.
54      * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
55      */
56     private static ForwardingBlockingQueue getQueue(Optional<Integer> capacity) {
57         final BlockingQueue<Runnable> delegate = capacity.isPresent() ? new LinkedBlockingQueue<>(capacity.get()) : new LinkedBlockingQueue<>();
58         return new ForwardingBlockingQueue(delegate);
59     }
60
61     @Override
62     public ExecutorService getExecutor() {
63         return Executors.unconfigurableExecutorService(executor);
64     }
65
66     public int getMinThreadCount() {
67         return executor.getCorePoolSize();
68     }
69
70     public void setMinThreadCount(int minThreadCount) {
71         executor.setCorePoolSize(minThreadCount);
72     }
73
74     @Override
75     public int getMaxThreadCount() {
76         return executor.getMaximumPoolSize();
77     }
78
79     public void setMaxThreadCount(int maxThreadCount) {
80         executor.setMaximumPoolSize(maxThreadCount);
81     }
82
83     public long getKeepAliveMillis() {
84         return executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
85     }
86
87     public void setKeepAliveMillis(long keepAliveMillis) {
88         executor.setKeepAliveTime(keepAliveMillis, TimeUnit.MILLISECONDS);
89     }
90
91     public void setThreadFactory(ThreadFactory threadFactory) {
92         executor.setThreadFactory(threadFactory);
93     }
94
95     public void prestartAllCoreThreads() {
96         executor.prestartAllCoreThreads();
97     }
98
99     @Override
100     public void close() throws IOException {
101         executor.shutdown();
102     }
103
104     /**
105      * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
106      */
107     private static class FlexibleRejectionHandler implements RejectedExecutionHandler {
108         @Override
109         public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
110             try {
111                 executor.getQueue().put(r);
112             } catch (InterruptedException e) {
113                 throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
114             }
115         }
116     }
117
118     private static class ForwardingBlockingQueue extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
119         private final BlockingQueue<Runnable> delegate;
120
121         public ForwardingBlockingQueue(BlockingQueue<Runnable> delegate) {
122             this.delegate = delegate;
123         }
124
125         @Override
126         protected BlockingQueue<Runnable> delegate() {
127             return delegate;
128         }
129
130         @Override
131         public boolean offer(final Runnable r) {
132             // ThreadPoolExecutor will spawn a new thread after core size is reached only
133             // if the queue.offer returns false.
134             return false;
135         }
136     }
137 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.