Remove support for actions/rpc/notifications, take two
[controller.git] / opendaylight / config / threadpool-config-impl / src / main / java / org / opendaylight / controller / config / threadpool / util / FlexibleThreadPoolWrapper.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.config.threadpool.util;
9
10 import java.io.Closeable;
11 import java.util.OptionalInt;
12 import java.util.concurrent.BlockingQueue;
13 import java.util.concurrent.ExecutorService;
14 import java.util.concurrent.Executors;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.RejectedExecutionException;
17 import java.util.concurrent.RejectedExecutionHandler;
18 import java.util.concurrent.ThreadFactory;
19 import java.util.concurrent.ThreadPoolExecutor;
20 import java.util.concurrent.TimeUnit;
21 import org.opendaylight.controller.config.threadpool.ThreadPool;
22
23 /**
24  * Implementation of {@link ThreadPool} using flexible number of threads wraps
25  * {@link ExecutorService}.
26  */
27 public class FlexibleThreadPoolWrapper implements ThreadPool, Closeable {
28     private final ThreadPoolExecutor executor;
29
30     public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
31             final TimeUnit timeUnit, final ThreadFactory threadFactory) {
32         this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(OptionalInt.empty()));
33     }
34
35     public FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
36             final TimeUnit timeUnit, final ThreadFactory threadFactory, final OptionalInt queueCapacity) {
37         this(minThreadCount, maxThreadCount, keepAlive, timeUnit, threadFactory, getQueue(queueCapacity));
38     }
39
40     private FlexibleThreadPoolWrapper(final int minThreadCount, final int maxThreadCount, final long keepAlive,
41             final TimeUnit timeUnit, final ThreadFactory threadFactory, final BlockingQueue<Runnable> queue) {
42
43         executor = new ThreadPoolExecutor(minThreadCount, maxThreadCount, keepAlive, timeUnit,
44                 queue, threadFactory, new FlexibleRejectionHandler());
45         executor.prestartAllCoreThreads();
46     }
47
48     /**
49      * Overriding the queue:
50      * ThreadPoolExecutor would not create new threads if the queue is not full, thus adding
51      * occurs in RejectedExecutionHandler.
52      * This impl saturates threadpool first, then queue. When both are full caller will get blocked.
53      */
54     private static ForwardingBlockingQueue getQueue(final OptionalInt capacity) {
55         return new ForwardingBlockingQueue(
56             capacity.isPresent() ? new LinkedBlockingQueue<>(capacity.orElseThrow()) : new LinkedBlockingQueue<>());
57     }
58
59     @Override
60     public ExecutorService getExecutor() {
61         return Executors.unconfigurableExecutorService(executor);
62     }
63
64     public int getMinThreadCount() {
65         return executor.getCorePoolSize();
66     }
67
68     public void setMinThreadCount(final int minThreadCount) {
69         executor.setCorePoolSize(minThreadCount);
70     }
71
72     @Override
73     public int getMaxThreadCount() {
74         return executor.getMaximumPoolSize();
75     }
76
77     public void setMaxThreadCount(final int maxThreadCount) {
78         executor.setMaximumPoolSize(maxThreadCount);
79     }
80
81     public long getKeepAliveMillis() {
82         return executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
83     }
84
85     public void setKeepAliveMillis(final long keepAliveMillis) {
86         executor.setKeepAliveTime(keepAliveMillis, TimeUnit.MILLISECONDS);
87     }
88
89     public void setThreadFactory(final ThreadFactory threadFactory) {
90         executor.setThreadFactory(threadFactory);
91     }
92
93     public void prestartAllCoreThreads() {
94         executor.prestartAllCoreThreads();
95     }
96
97     @Override
98     public void close() {
99         executor.shutdown();
100     }
101
102     /**
103      * if the max threads are met, then it will raise a rejectedExecution. We then push to the queue.
104      */
105     private static final class FlexibleRejectionHandler implements RejectedExecutionHandler {
106         @Override
107         @SuppressWarnings("checkstyle:parameterName")
108         public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
109             try {
110                 executor.getQueue().put(r);
111             } catch (InterruptedException e) {
112                 throw new RejectedExecutionException("Interrupted while waiting on the queue", e);
113             }
114         }
115     }
116
117     private static final class ForwardingBlockingQueue
118             extends com.google.common.util.concurrent.ForwardingBlockingQueue<Runnable> {
119         private final BlockingQueue<Runnable> delegate;
120
121         ForwardingBlockingQueue(final BlockingQueue<Runnable> delegate) {
122             this.delegate = delegate;
123         }
124
125         @Override
126         protected BlockingQueue<Runnable> delegate() {
127             return delegate;
128         }
129
130         @Override
131         @SuppressWarnings("checkstyle:parameterName")
132         public boolean offer(final Runnable o) {
133             // ThreadPoolExecutor will spawn a new thread after core size is reached only
134             // if the queue.offer returns false.
135             return false;
136         }
137     }
138 }