Scripted update of if statements
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / CachedThreadPoolExecutor.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import com.google.common.base.MoreObjects;
11 import com.google.common.base.MoreObjects.ToStringHelper;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.RejectedExecutionException;
17 import java.util.concurrent.RejectedExecutionHandler;
18 import java.util.concurrent.SynchronousQueue;
19 import java.util.concurrent.ThreadPoolExecutor;
20 import java.util.concurrent.TimeUnit;
21
22 /**
23  * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
24  * constructed threads, when they are available, over creating new threads.
25  * <p>
26  * See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
27  *
28  * @author Thomas Pantelis
29  */
30 public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
31
32     private static final long IDLE_TIMEOUT_IN_SEC = 60L;
33
34     private final ExecutorQueue executorQueue;
35
36     private final String threadPrefix;
37
38     private final int maximumQueueSize;
39
40     private final RejectedTaskHandler rejectedTaskHandler;
41
42     /**
43      * Constructs an instance.
44      *
45      * @param maximumPoolSize
46      *            the maximum number of threads to allow in the pool. Threads will terminate after
47      *            being idle for 60 seconds.
48      * @param maximumQueueSize
49      *            the capacity of the queue.
50      * @param threadPrefix
51      *            the name prefix for threads created by this executor.
52      */
53     public CachedThreadPoolExecutor( final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix ) {
54         // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
55         // We don't specify any core threads (first parameter) so, when a task is submitted,
56         // the base class will always try to offer to the queue. If there is an existing waiting
57         // thread, the offer will succeed and the task will be handed to the thread to execute. If
58         // there's no waiting thread, either because there are no threads in the pool or all threads
59         // are busy, the base class will try to create a new thread. If the maximum thread limit has
60         // been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
61         // to offer to the backing queue. If that succeeds, the task will execute as soon as a
62         // thread becomes available. If the offer fails to the backing queue, the task is rejected.
63         super( 0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
64                new ExecutorQueue( maximumQueueSize ) );
65
66         this.threadPrefix = Preconditions.checkNotNull( threadPrefix );
67         this.maximumQueueSize = maximumQueueSize;
68
69         setThreadFactory( new ThreadFactoryBuilder().setDaemon( true )
70                                             .setNameFormat( this.threadPrefix + "-%d" ).build() );
71
72         executorQueue = (ExecutorQueue)super.getQueue();
73
74         rejectedTaskHandler = new RejectedTaskHandler(
75                 executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
76         super.setRejectedExecutionHandler( rejectedTaskHandler );
77     }
78
79     @Override
80     public void setRejectedExecutionHandler( final RejectedExecutionHandler handler ) {
81         Preconditions.checkNotNull( handler );
82         rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
83     }
84
85     @Override
86     public RejectedExecutionHandler getRejectedExecutionHandler() {
87         return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
88     }
89
90     @Override
91     public BlockingQueue<Runnable> getQueue() {
92         return executorQueue.getBackingQueue();
93     }
94
95     public long getLargestQueueSize() {
96         return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
97     }
98
99     protected ToStringHelper addToStringAttributes( final ToStringHelper toStringHelper ) {
100         return toStringHelper;
101     }
102
103     @Override
104     public final String toString() {
105         return addToStringAttributes( MoreObjects.toStringHelper( this )
106                 .add( "Thread Prefix", threadPrefix )
107                 .add( "Current Thread Pool Size", getPoolSize() )
108                 .add( "Largest Thread Pool Size", getLargestPoolSize() )
109                 .add( "Max Thread Pool Size", getMaximumPoolSize() )
110                 .add( "Current Queue Size", executorQueue.getBackingQueue().size() )
111                 .add( "Largest Queue Size", getLargestQueueSize() )
112                 .add( "Max Queue Size", maximumQueueSize )
113                 .add( "Active Thread Count", getActiveCount() )
114                 .add( "Completed Task Count", getCompletedTaskCount() )
115                 .add( "Total Task Count", getTaskCount() ) ).toString();
116     }
117
118     /**
119      * A customized SynchronousQueue that has a backing bounded LinkedBlockingQueue. This class
120      * overrides the #poll methods to first try to poll the backing queue for a task. If the backing
121      * queue is empty, it calls the base SynchronousQueue#poll method. In this manner, we get the
122      * thread reuse behavior of the SynchronousQueue with the added ability to queue tasks when all
123      * threads are busy.
124      */
125     private static class ExecutorQueue extends SynchronousQueue<Runnable> {
126
127         private static final long serialVersionUID = 1L;
128
129         private static final long POLL_WAIT_TIME_IN_MS = 300;
130
131         private final LinkedBlockingQueue<Runnable> backingQueue;
132
133         ExecutorQueue( final int maxBackingQueueSize ) {
134             backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
135         }
136
137         LinkedBlockingQueue<Runnable> getBackingQueue() {
138             return backingQueue;
139         }
140
141         @Override
142         public Runnable poll( final long timeout, final TimeUnit unit ) throws InterruptedException {
143             long totalWaitTime = unit.toMillis( timeout );
144             long waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
145             Runnable task = null;
146
147             // We loop here, each time polling the backingQueue first then our queue, instead of
148             // polling each just once. This is to handle the following timing edge case:
149             //
150             //   We poll the backingQueue and it's empty but, before the call to super.poll,
151             //   a task is offered but no thread is immediately available and the task is put on the
152             //   backingQueue. There is a slight chance that all the other threads could be at the
153             //   same point, in which case they would all call super.poll and wait. If we only
154             //   called poll once, no thread would execute the task (unless/until another task was
155             //   later submitted). But by looping and breaking the specified timeout into small
156             //   periods, one thread will eventually wake up and get the task from the backingQueue
157             //   and execute it, although slightly delayed.
158
159             while (task == null) {
160                 // First try to get a task from the backing queue.
161                 task = backingQueue.poll();
162                 if (task == null) {
163                     // No task in backing - call the base class to wait for one to be offered.
164                     task = super.poll( waitTime, TimeUnit.MILLISECONDS );
165
166                     totalWaitTime -= POLL_WAIT_TIME_IN_MS;
167                     if (totalWaitTime <= 0 ) {
168                         break;
169                     }
170
171                     waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
172                 }
173             }
174
175             return task;
176         }
177
178         @Override
179         public Runnable poll() {
180             Runnable task = backingQueue.poll();
181             return task != null ? task : super.poll();
182         }
183     }
184
185     /**
186      * Internal RejectedExecutionHandler that tries to offer rejected tasks to the backing queue.
187      * If the queue is full, we throw a RejectedExecutionException by default. The client can
188      * override this behavior be specifying their own RejectedExecutionHandler, in which case we
189      * delegate to that handler.
190      */
191     private static class RejectedTaskHandler implements RejectedExecutionHandler {
192
193         private final LinkedBlockingQueue<Runnable> backingQueue;
194         private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
195
196         RejectedTaskHandler( final LinkedBlockingQueue<Runnable> backingQueue,
197                              final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
198             this.backingQueue = backingQueue;
199             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
200         }
201
202         void setDelegateRejectedExecutionHandler(
203                 final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
204             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
205         }
206
207         RejectedExecutionHandler getDelegateRejectedExecutionHandler(){
208             return delegateRejectedExecutionHandler;
209         }
210
211         @Override
212         public void rejectedExecution( final Runnable task, final ThreadPoolExecutor executor ) {
213             if (executor.isShutdown()) {
214                 throw new RejectedExecutionException( "Executor has been shutdown." );
215             }
216
217             if (!backingQueue.offer(task)) {
218                 delegateRejectedExecutionHandler.rejectedExecution( task, executor );
219             }
220         }
221     }
222 }