e3f1f40f4ce5a0c960285e1a73832d9c199df553
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / CachedThreadPoolExecutor.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.io.Serial;
16 import java.util.concurrent.BlockingQueue;
17 import java.util.concurrent.LinkedBlockingQueue;
18 import java.util.concurrent.RejectedExecutionException;
19 import java.util.concurrent.RejectedExecutionHandler;
20 import java.util.concurrent.SynchronousQueue;
21 import java.util.concurrent.ThreadPoolExecutor;
22 import java.util.concurrent.TimeUnit;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
27  * constructed threads, when they are available, over creating new threads.
28  *
29  * <p>See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
30  *
31  * @author Thomas Pantelis
32  */
33 public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
34
35     private static final long IDLE_TIMEOUT_IN_SEC = 60L;
36
37     private final ExecutorQueue executorQueue;
38
39     private final String threadPrefix;
40
41     private final int maximumQueueSize;
42
43     private final RejectedTaskHandler rejectedTaskHandler;
44
45     /**
46      * Constructs an instance.
47      *
48      * @param maximumPoolSize
49      *            the maximum number of threads to allow in the pool. Threads will terminate after
50      *            being idle for 60 seconds.
51      * @param maximumQueueSize
52      *            the capacity of the queue.
53      * @param threadPrefix
54      *            the name prefix for threads created by this executor.
55      * @param loggerIdentity
56      *               the class to use as logger name for logging uncaught exceptions from the threads.
57      */
58     // due to loggerIdentity argument usage
59     @SuppressWarnings("checkstyle:LoggerFactoryClassParameter")
60     public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix,
61             final Class<?> loggerIdentity) {
62         // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
63         // We don't specify any core threads (first parameter) so, when a task is submitted,
64         // the base class will always try to offer to the queue. If there is an existing waiting
65         // thread, the offer will succeed and the task will be handed to the thread to execute. If
66         // there's no waiting thread, either because there are no threads in the pool or all threads
67         // are busy, the base class will try to create a new thread. If the maximum thread limit has
68         // been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
69         // to offer to the backing queue. If that succeeds, the task will execute as soon as a
70         // thread becomes available. If the offer fails to the backing queue, the task is rejected.
71         super(0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
72                new ExecutorQueue(maximumQueueSize));
73
74         this.threadPrefix = requireNonNull(threadPrefix);
75         this.maximumQueueSize = maximumQueueSize;
76
77         setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix)
78                 .logger(LoggerFactory.getLogger(loggerIdentity)).build().get());
79
80         executorQueue = (ExecutorQueue)super.getQueue();
81
82         rejectedTaskHandler = new RejectedTaskHandler(
83                 executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy());
84         super.setRejectedExecutionHandler(rejectedTaskHandler);
85     }
86
87     @Override
88     public void setRejectedExecutionHandler(final RejectedExecutionHandler handler) {
89         rejectedTaskHandler.setDelegateRejectedExecutionHandler(requireNonNull(handler));
90     }
91
92     @Override
93     public RejectedExecutionHandler getRejectedExecutionHandler() {
94         return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
95     }
96
97     @Override
98     public BlockingQueue<Runnable> getQueue() {
99         return executorQueue.getBackingQueue();
100     }
101
102     public long getLargestQueueSize() {
103         return executorQueue.getBackingQueue().getLargestQueueSize();
104     }
105
106     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
107         return toStringHelper;
108     }
109
110     @Override
111     public final String toString() {
112         return addToStringAttributes(MoreObjects.toStringHelper(this)
113                 .add("Thread Prefix", threadPrefix)
114                 .add("Current Thread Pool Size", getPoolSize())
115                 .add("Largest Thread Pool Size", getLargestPoolSize())
116                 .add("Max Thread Pool Size", getMaximumPoolSize())
117                 .add("Current Queue Size", executorQueue.getBackingQueue().size())
118                 .add("Largest Queue Size", getLargestQueueSize())
119                 .add("Max Queue Size", maximumQueueSize)
120                 .add("Active Thread Count", getActiveCount())
121                 .add("Completed Task Count", getCompletedTaskCount())
122                 .add("Total Task Count", getTaskCount())).toString();
123     }
124
125     /**
126      * A customized SynchronousQueue that has a backing bounded LinkedBlockingQueue. This class
127      * overrides the #poll methods to first try to poll the backing queue for a task. If the backing
128      * queue is empty, it calls the base SynchronousQueue#poll method. In this manner, we get the
129      * thread reuse behavior of the SynchronousQueue with the added ability to queue tasks when all
130      * threads are busy.
131      */
132     private static class ExecutorQueue extends SynchronousQueue<Runnable> {
133         @Serial
134         private static final long serialVersionUID = 1L;
135
136         private static final long POLL_WAIT_TIME_IN_MS = 300;
137
138         @SuppressFBWarnings("SE_BAD_FIELD")
139         // Runnable is not Serializable
140         private final TrackingLinkedBlockingQueue<Runnable> backingQueue;
141
142         ExecutorQueue(final int maxBackingQueueSize) {
143             backingQueue = new TrackingLinkedBlockingQueue<>(maxBackingQueueSize);
144         }
145
146         TrackingLinkedBlockingQueue<Runnable> getBackingQueue() {
147             return backingQueue;
148         }
149
150         @Override
151         public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException {
152             long totalWaitTime = unit.toMillis(timeout);
153             long waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
154             Runnable task = null;
155
156             // We loop here, each time polling the backingQueue first then our queue, instead of
157             // polling each just once. This is to handle the following timing edge case:
158             //
159             //   We poll the backingQueue and it's empty but, before the call to super.poll,
160             //   a task is offered but no thread is immediately available and the task is put on the
161             //   backingQueue. There is a slight chance that all the other threads could be at the
162             //   same point, in which case they would all call super.poll and wait. If we only
163             //   called poll once, no thread would execute the task (unless/until another task was
164             //   later submitted). But by looping and breaking the specified timeout into small
165             //   periods, one thread will eventually wake up and get the task from the backingQueue
166             //   and execute it, although slightly delayed.
167
168             while (task == null) {
169                 // First try to get a task from the backing queue.
170                 task = backingQueue.poll();
171                 if (task == null) {
172                     // No task in backing - call the base class to wait for one to be offered.
173                     task = super.poll(waitTime, TimeUnit.MILLISECONDS);
174
175                     totalWaitTime -= POLL_WAIT_TIME_IN_MS;
176                     if (totalWaitTime <= 0) {
177                         break;
178                     }
179
180                     waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
181                 }
182             }
183
184             return task;
185         }
186
187         @Override
188         public Runnable poll() {
189             Runnable task = backingQueue.poll();
190             return task != null ? task : super.poll();
191         }
192     }
193
194     /**
195      * Internal RejectedExecutionHandler that tries to offer rejected tasks to the backing queue.
196      * If the queue is full, we throw a RejectedExecutionException by default. The client can
197      * override this behavior be specifying their own RejectedExecutionHandler, in which case we
198      * delegate to that handler.
199      */
200     private static class RejectedTaskHandler implements RejectedExecutionHandler {
201
202         private final LinkedBlockingQueue<Runnable> backingQueue;
203         private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
204
205         RejectedTaskHandler(final LinkedBlockingQueue<Runnable> backingQueue,
206                              final RejectedExecutionHandler delegateRejectedExecutionHandler) {
207             this.backingQueue = backingQueue;
208             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
209         }
210
211         void setDelegateRejectedExecutionHandler(
212                 final RejectedExecutionHandler delegateRejectedExecutionHandler) {
213             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
214         }
215
216         RejectedExecutionHandler getDelegateRejectedExecutionHandler() {
217             return delegateRejectedExecutionHandler;
218         }
219
220         @Override
221         public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) {
222             if (executor.isShutdown()) {
223                 throw new RejectedExecutionException("Executor has been shutdown.");
224             }
225
226             if (!backingQueue.offer(task)) {
227                 delegateRejectedExecutionHandler.rejectedExecution(task, executor);
228             }
229         }
230     }
231 }