Fix FindBugs violations and enable enforcement in utils
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / CachedThreadPoolExecutor.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
15 import java.util.concurrent.BlockingQueue;
16 import java.util.concurrent.LinkedBlockingQueue;
17 import java.util.concurrent.RejectedExecutionException;
18 import java.util.concurrent.RejectedExecutionHandler;
19 import java.util.concurrent.SynchronousQueue;
20 import java.util.concurrent.ThreadPoolExecutor;
21 import java.util.concurrent.TimeUnit;
22 import org.slf4j.LoggerFactory;
23
24 /**
25  * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
26  * constructed threads, when they are available, over creating new threads.
27  *
28  * <p>See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
29  *
30  * @author Thomas Pantelis
31  */
32 public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
33
34     private static final long IDLE_TIMEOUT_IN_SEC = 60L;
35
36     private final ExecutorQueue executorQueue;
37
38     private final String threadPrefix;
39
40     private final int maximumQueueSize;
41
42     private final RejectedTaskHandler rejectedTaskHandler;
43
44     /**
45      * Constructs an instance.
46      *
47      * @param maximumPoolSize
48      *            the maximum number of threads to allow in the pool. Threads will terminate after
49      *            being idle for 60 seconds.
50      * @param maximumQueueSize
51      *            the capacity of the queue.
52      * @param threadPrefix
53      *            the name prefix for threads created by this executor.
54      * @param loggerIdentity
55      *               the class to use as logger name for logging uncaught exceptions from the threads.
56      */
57     public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix,
58             Class<?> loggerIdentity) {
59         // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
60         // We don't specify any core threads (first parameter) so, when a task is submitted,
61         // the base class will always try to offer to the queue. If there is an existing waiting
62         // thread, the offer will succeed and the task will be handed to the thread to execute. If
63         // there's no waiting thread, either because there are no threads in the pool or all threads
64         // are busy, the base class will try to create a new thread. If the maximum thread limit has
65         // been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
66         // to offer to the backing queue. If that succeeds, the task will execute as soon as a
67         // thread becomes available. If the offer fails to the backing queue, the task is rejected.
68         super(0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
69                new ExecutorQueue(maximumQueueSize));
70
71         this.threadPrefix = requireNonNull(threadPrefix);
72         this.maximumQueueSize = maximumQueueSize;
73
74         setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix)
75                 .logger(LoggerFactory.getLogger(loggerIdentity)).build().get());
76
77         executorQueue = (ExecutorQueue)super.getQueue();
78
79         rejectedTaskHandler = new RejectedTaskHandler(
80                 executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy());
81         super.setRejectedExecutionHandler(rejectedTaskHandler);
82     }
83
84     /**
85      * Constructor.
86      * @deprecated Please use {@link #CachedThreadPoolExecutor(int, int, String, Class)} instead.
87      */
88     @Deprecated
89     public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix) {
90         this(maximumPoolSize, maximumQueueSize, threadPrefix, CachedThreadPoolExecutor.class);
91     }
92
93     @Override
94     public void setRejectedExecutionHandler(final RejectedExecutionHandler handler) {
95         rejectedTaskHandler.setDelegateRejectedExecutionHandler(requireNonNull(handler));
96     }
97
98     @Override
99     public RejectedExecutionHandler getRejectedExecutionHandler() {
100         return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
101     }
102
103     @Override
104     public BlockingQueue<Runnable> getQueue() {
105         return executorQueue.getBackingQueue();
106     }
107
108     public long getLargestQueueSize() {
109         return executorQueue.getBackingQueue().getLargestQueueSize();
110     }
111
112     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
113         return toStringHelper;
114     }
115
116     @Override
117     public final String toString() {
118         return addToStringAttributes(MoreObjects.toStringHelper(this)
119                 .add("Thread Prefix", threadPrefix)
120                 .add("Current Thread Pool Size", getPoolSize())
121                 .add("Largest Thread Pool Size", getLargestPoolSize())
122                 .add("Max Thread Pool Size", getMaximumPoolSize())
123                 .add("Current Queue Size", executorQueue.getBackingQueue().size())
124                 .add("Largest Queue Size", getLargestQueueSize())
125                 .add("Max Queue Size", maximumQueueSize)
126                 .add("Active Thread Count", getActiveCount())
127                 .add("Completed Task Count", getCompletedTaskCount())
128                 .add("Total Task Count", getTaskCount())).toString();
129     }
130
131     /**
132      * A customized SynchronousQueue that has a backing bounded LinkedBlockingQueue. This class
133      * overrides the #poll methods to first try to poll the backing queue for a task. If the backing
134      * queue is empty, it calls the base SynchronousQueue#poll method. In this manner, we get the
135      * thread reuse behavior of the SynchronousQueue with the added ability to queue tasks when all
136      * threads are busy.
137      */
138     private static class ExecutorQueue extends SynchronousQueue<Runnable> {
139
140         private static final long serialVersionUID = 1L;
141
142         private static final long POLL_WAIT_TIME_IN_MS = 300;
143
144         @SuppressFBWarnings("SE_BAD_FIELD") // Runnable is not Serializable
145         private final TrackingLinkedBlockingQueue<Runnable> backingQueue;
146
147         ExecutorQueue(final int maxBackingQueueSize) {
148             backingQueue = new TrackingLinkedBlockingQueue<>(maxBackingQueueSize);
149         }
150
151         TrackingLinkedBlockingQueue<Runnable> getBackingQueue() {
152             return backingQueue;
153         }
154
155         @Override
156         public Runnable poll(final long timeout, final TimeUnit unit) throws InterruptedException {
157             long totalWaitTime = unit.toMillis(timeout);
158             long waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
159             Runnable task = null;
160
161             // We loop here, each time polling the backingQueue first then our queue, instead of
162             // polling each just once. This is to handle the following timing edge case:
163             //
164             //   We poll the backingQueue and it's empty but, before the call to super.poll,
165             //   a task is offered but no thread is immediately available and the task is put on the
166             //   backingQueue. There is a slight chance that all the other threads could be at the
167             //   same point, in which case they would all call super.poll and wait. If we only
168             //   called poll once, no thread would execute the task (unless/until another task was
169             //   later submitted). But by looping and breaking the specified timeout into small
170             //   periods, one thread will eventually wake up and get the task from the backingQueue
171             //   and execute it, although slightly delayed.
172
173             while (task == null) {
174                 // First try to get a task from the backing queue.
175                 task = backingQueue.poll();
176                 if (task == null) {
177                     // No task in backing - call the base class to wait for one to be offered.
178                     task = super.poll(waitTime, TimeUnit.MILLISECONDS);
179
180                     totalWaitTime -= POLL_WAIT_TIME_IN_MS;
181                     if (totalWaitTime <= 0) {
182                         break;
183                     }
184
185                     waitTime = Math.min(totalWaitTime, POLL_WAIT_TIME_IN_MS);
186                 }
187             }
188
189             return task;
190         }
191
192         @Override
193         public Runnable poll() {
194             Runnable task = backingQueue.poll();
195             return task != null ? task : super.poll();
196         }
197     }
198
199     /**
200      * Internal RejectedExecutionHandler that tries to offer rejected tasks to the backing queue.
201      * If the queue is full, we throw a RejectedExecutionException by default. The client can
202      * override this behavior be specifying their own RejectedExecutionHandler, in which case we
203      * delegate to that handler.
204      */
205     private static class RejectedTaskHandler implements RejectedExecutionHandler {
206
207         private final LinkedBlockingQueue<Runnable> backingQueue;
208         private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
209
210         RejectedTaskHandler(final LinkedBlockingQueue<Runnable> backingQueue,
211                              final RejectedExecutionHandler delegateRejectedExecutionHandler) {
212             this.backingQueue = backingQueue;
213             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
214         }
215
216         void setDelegateRejectedExecutionHandler(
217                 final RejectedExecutionHandler delegateRejectedExecutionHandler) {
218             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
219         }
220
221         RejectedExecutionHandler getDelegateRejectedExecutionHandler() {
222             return delegateRejectedExecutionHandler;
223         }
224
225         @Override
226         public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) {
227             if (executor.isShutdown()) {
228                 throw new RejectedExecutionException("Executor has been shutdown.");
229             }
230
231             if (!backingQueue.offer(task)) {
232                 delegateRejectedExecutionHandler.rejectedExecution(task, executor);
233             }
234         }
235     }
236 }