cff95ff221075f23920ca1004a09e954e2852dce
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / CachedThreadPoolExecutor.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.RejectedExecutionException;
17 import java.util.concurrent.RejectedExecutionHandler;
18 import java.util.concurrent.SynchronousQueue;
19 import java.util.concurrent.ThreadPoolExecutor;
20 import java.util.concurrent.TimeUnit;
21 import org.slf4j.LoggerFactory;
22
23 /**
24  * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
25  * constructed threads, when they are available, over creating new threads.
26  *
27  * <p>See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
28  *
29  * @author Thomas Pantelis
30  */
31 public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
32
33     private static final long IDLE_TIMEOUT_IN_SEC = 60L;
34
35     private final ExecutorQueue executorQueue;
36
37     private final String threadPrefix;
38
39     private final int maximumQueueSize;
40
41     private final RejectedTaskHandler rejectedTaskHandler;
42
43     /**
44      * Constructs an instance.
45      *
46      * @param maximumPoolSize
47      *            the maximum number of threads to allow in the pool. Threads will terminate after
48      *            being idle for 60 seconds.
49      * @param maximumQueueSize
50      *            the capacity of the queue.
51      * @param threadPrefix
52      *            the name prefix for threads created by this executor.
53      * @param loggerIdentity
54      *               the class to use as logger name for logging uncaught exceptions from the threads.
55      */
56     public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix,
57             Class<?> loggerIdentity) {
58         // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
59         // We don't specify any core threads (first parameter) so, when a task is submitted,
60         // the base class will always try to offer to the queue. If there is an existing waiting
61         // thread, the offer will succeed and the task will be handed to the thread to execute. If
62         // there's no waiting thread, either because there are no threads in the pool or all threads
63         // are busy, the base class will try to create a new thread. If the maximum thread limit has
64         // been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
65         // to offer to the backing queue. If that succeeds, the task will execute as soon as a
66         // thread becomes available. If the offer fails to the backing queue, the task is rejected.
67         super(0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
68                new ExecutorQueue(maximumQueueSize));
69
70         this.threadPrefix = requireNonNull(threadPrefix);
71         this.maximumQueueSize = maximumQueueSize;
72
73         setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix)
74                 .logger(LoggerFactory.getLogger(loggerIdentity)).build().get());
75
76         executorQueue = (ExecutorQueue)super.getQueue();
77
78         rejectedTaskHandler = new RejectedTaskHandler(
79                 executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy());
80         super.setRejectedExecutionHandler(rejectedTaskHandler);
81     }
82
83     /**
84      * Constructor.
85      * @deprecated Please use {@link #CachedThreadPoolExecutor(int, int, String, Class)} instead.
86      */
87     @Deprecated
88     public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix) {
89         this(maximumPoolSize, maximumQueueSize, threadPrefix, CachedThreadPoolExecutor.class);
90     }
91
92     @Override
93     public void setRejectedExecutionHandler(final RejectedExecutionHandler handler) {
94         rejectedTaskHandler.setDelegateRejectedExecutionHandler(requireNonNull(handler));
95     }
96
97     @Override
98     public RejectedExecutionHandler getRejectedExecutionHandler() {
99         return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
100     }
101
102     @Override
103     public BlockingQueue<Runnable> getQueue() {
104         return executorQueue.getBackingQueue();
105     }
106
107     public long getLargestQueueSize() {
108         return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
109     }
110
111     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
112         return toStringHelper;
113     }
114
115     @Override
116     public final String toString() {
117         return addToStringAttributes(MoreObjects.toStringHelper(this)
118                 .add("Thread Prefix", threadPrefix)
119                 .add("Current Thread Pool Size", getPoolSize())
120                 .add("Largest Thread Pool Size", getLargestPoolSize())
121                 .add("Max Thread Pool Size", getMaximumPoolSize())
122                 .add("Current Queue Size", executorQueue.getBackingQueue().size())
123                 .add("Largest Queue Size", getLargestQueueSize())
124                 .add("Max Queue Size", maximumQueueSize)
125                 .add("Active Thread Count", getActiveCount())
126                 .add("Completed Task Count", getCompletedTaskCount())
127                 .add("Total Task Count", getTaskCount())).toString();
128     }
129
130     /**
131      * A customized SynchronousQueue that has a backing bounded LinkedBlockingQueue. This class
132      * overrides the #poll methods to first try to poll the backing queue for a task. If the backing
133      * queue is empty, it calls the base SynchronousQueue#poll method. In this manner, we get the
134      * thread reuse behavior of the SynchronousQueue with the added ability to queue tasks when all
135      * threads are busy.
136      */
137     private static class ExecutorQueue extends SynchronousQueue<Runnable> {
138
139         private static final long serialVersionUID = 1L;
140
141         private static final long POLL_WAIT_TIME_IN_MS = 300;
142
143         private final LinkedBlockingQueue<Runnable> backingQueue;
144
145         ExecutorQueue(final int maxBackingQueueSize) {
146             backingQueue = new TrackingLinkedBlockingQueue<>(maxBackingQueueSize);
147         }
148
149         LinkedBlockingQueue<Runnable> getBackingQueue() {
150             return backingQueue;
151         }
152
153         @Override
154         public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException {
155             long totalWaitTime = unit.toMillis(timeout);
156             long waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
157             Runnable task = null;
158
159             // We loop here, each time polling the backingQueue first then our queue, instead of
160             // polling each just once. This is to handle the following timing edge case:
161             //
162             //   We poll the backingQueue and it's empty but, before the call to super.poll,
163             //   a task is offered but no thread is immediately available and the task is put on the
164             //   backingQueue. There is a slight chance that all the other threads could be at the
165             //   same point, in which case they would all call super.poll and wait. If we only
166             //   called poll once, no thread would execute the task (unless/until another task was
167             //   later submitted). But by looping and breaking the specified timeout into small
168             //   periods, one thread will eventually wake up and get the task from the backingQueue
169             //   and execute it, although slightly delayed.
170
171             while (task == null) {
172                 // First try to get a task from the backing queue.
173                 task = backingQueue.poll();
174                 if (task == null) {
175                     // No task in backing - call the base class to wait for one to be offered.
176                     task = super.poll(waitTime, TimeUnit.MILLISECONDS);
177
178                     totalWaitTime -= POLL_WAIT_TIME_IN_MS;
179                     if (totalWaitTime <= 0) {
180                         break;
181                     }
182
183                     waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
184                 }
185             }
186
187             return task;
188         }
189
190         @Override
191         public Runnable poll() {
192             Runnable task = backingQueue.poll();
193             return task != null ? task : super.poll();
194         }
195     }
196
197     /**
198      * Internal RejectedExecutionHandler that tries to offer rejected tasks to the backing queue.
199      * If the queue is full, we throw a RejectedExecutionException by default. The client can
200      * override this behavior be specifying their own RejectedExecutionHandler, in which case we
201      * delegate to that handler.
202      */
203     private static class RejectedTaskHandler implements RejectedExecutionHandler {
204
205         private final LinkedBlockingQueue<Runnable> backingQueue;
206         private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
207
208         RejectedTaskHandler(final LinkedBlockingQueue<Runnable> backingQueue,
209                              final RejectedExecutionHandler delegateRejectedExecutionHandler) {
210             this.backingQueue = backingQueue;
211             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
212         }
213
214         void setDelegateRejectedExecutionHandler(
215                 final RejectedExecutionHandler delegateRejectedExecutionHandler) {
216             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
217         }
218
219         RejectedExecutionHandler getDelegateRejectedExecutionHandler() {
220             return delegateRejectedExecutionHandler;
221         }
222
223         @Override
224         public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) {
225             if (executor.isShutdown()) {
226                 throw new RejectedExecutionException("Executor has been shutdown.");
227             }
228
229             if (!backingQueue.offer(task)) {
230                 delegateRejectedExecutionHandler.rejectedExecution(task, executor);
231             }
232         }
233     }
234 }