Added tests for yang.model.util
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / CachedThreadPoolExecutor.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.LinkedBlockingQueue;
12 import java.util.concurrent.RejectedExecutionException;
13 import java.util.concurrent.RejectedExecutionHandler;
14 import java.util.concurrent.SynchronousQueue;
15 import java.util.concurrent.ThreadPoolExecutor;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.atomic.AtomicLong;
18
19 import com.google.common.base.Objects;
20 import com.google.common.base.Objects.ToStringHelper;
21 import com.google.common.base.Preconditions;
22 import com.google.common.util.concurrent.ThreadFactoryBuilder;
23
24 /**
25  * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
26  * constructed threads, when they are available, over creating new threads.
27  * <p>
28  * See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
29  *
30  * @author Thomas Pantelis
31  */
32 public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
33
34     private static final long IDLE_TIMEOUT_IN_SEC = 60L;
35
36     private final AtomicLong largestBackingQueueSize = new AtomicLong( 0 );
37
38     private final ExecutorQueue executorQueue;
39
40     private final String threadPrefix;
41
42     private final int maximumQueueSize;
43
44     private final RejectedTaskHandler rejectedTaskHandler;
45
46     /**
47      * Constructs an instance.
48      *
49      * @param maximumPoolSize
50      *            the maximum number of threads to allow in the pool. Threads will terminate after
51      *            being idle for 60 seconds.
52      * @param maximumQueueSize
53      *            the capacity of the queue.
54      * @param threadPrefix
55      *            the name prefix for threads created by this executor.
56      */
57     public CachedThreadPoolExecutor( int maximumPoolSize, int maximumQueueSize, String threadPrefix ) {
58         // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
59         // We don't specify any core threads (first parameter) so, when a task is submitted,
60         // the base class will always try to offer to the queue. If there is an existing waiting
61         // thread, the offer will succeed and the task will be handed to the thread to execute. If
62         // there's no waiting thread, either because there are no threads in the pool or all threads
63         // are busy, the base class will try to create a new thread. If the maximum thread limit has
64         // been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
65         // to offer to the backing queue. If that succeeds, the task will execute as soon as a
66         // thread becomes available. If the offer fails to the backing queue, the task is rejected.
67         super( 0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
68                new ExecutorQueue( maximumQueueSize ) );
69
70         this.threadPrefix = Preconditions.checkNotNull( threadPrefix );
71         this.maximumQueueSize = maximumQueueSize;
72
73         setThreadFactory( new ThreadFactoryBuilder().setDaemon( true )
74                                             .setNameFormat( this.threadPrefix + "-%d" ).build() );
75
76         executorQueue = (ExecutorQueue)super.getQueue();
77
78         rejectedTaskHandler = new RejectedTaskHandler(
79                 executorQueue.getBackingQueue(), largestBackingQueueSize );
80         super.setRejectedExecutionHandler( rejectedTaskHandler );
81     }
82
83     @Override
84     public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) {
85         rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
86     }
87
88     @Override
89     public BlockingQueue<Runnable> getQueue(){
90         return executorQueue.getBackingQueue();
91     }
92
93     public long getLargestQueueSize() {
94         return largestBackingQueueSize.get();
95     }
96
97     protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
98         return toStringHelper;
99     }
100
101     @Override
102     public final String toString() {
103         return addToStringAttributes( Objects.toStringHelper( this )
104                 .add( "Thread Prefix", threadPrefix )
105                 .add( "Current Thread Pool Size", getPoolSize() )
106                 .add( "Largest Thread Pool Size", getLargestPoolSize() )
107                 .add( "Max Thread Pool Size", getMaximumPoolSize() )
108                 .add( "Current Queue Size", executorQueue.getBackingQueue().size() )
109                 .add( "Largest Queue Size", getLargestQueueSize() )
110                 .add( "Max Queue Size", maximumQueueSize )
111                 .add( "Active Thread Count", getActiveCount() )
112                 .add( "Completed Task Count", getCompletedTaskCount() )
113                 .add( "Total Task Count", getTaskCount() ) ).toString();
114     }
115
116     /**
117      * A customized SynchronousQueue that has a backing bounded LinkedBlockingQueue. This class
118      * overrides the #poll methods to first try to poll the backing queue for a task. If the backing
119      * queue is empty, it calls the base SynchronousQueue#poll method. In this manner, we get the
120      * thread reuse behavior of the SynchronousQueue with the added ability to queue tasks when all
121      * threads are busy.
122      */
123     private static class ExecutorQueue extends SynchronousQueue<Runnable> {
124
125         private static final long serialVersionUID = 1L;
126
127         private static final long POLL_WAIT_TIME_IN_MS = 300;
128
129         private final LinkedBlockingQueue<Runnable> backingQueue;
130
131         ExecutorQueue( int maxBackingQueueSize ) {
132             backingQueue = new LinkedBlockingQueue<>( maxBackingQueueSize );
133         }
134
135         LinkedBlockingQueue<Runnable> getBackingQueue() {
136             return backingQueue;
137         }
138
139         @Override
140         public Runnable poll( long timeout, TimeUnit unit ) throws InterruptedException {
141             long totalWaitTime = unit.toMillis( timeout );
142             long waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
143             Runnable task = null;
144
145             // We loop here, each time polling the backingQueue first then our queue, instead of
146             // polling each just once. This is to handle the following timing edge case:
147             //
148             //   We poll the backingQueue and it's empty but, before the call to super.poll,
149             //   a task is offered but no thread is immediately available and the task is put on the
150             //   backingQueue. There is a slight chance that all the other threads could be at the
151             //   same point, in which case they would all call super.poll and wait. If we only
152             //   called poll once, no thread would execute the task (unless/until another task was
153             //   later submitted). But by looping and breaking the specified timeout into small
154             //   periods, one thread will eventually wake up and get the task from the backingQueue
155             //   and execute it, although slightly delayed.
156
157             while( task == null ) {
158                 // First try to get a task from the backing queue.
159                 task = backingQueue.poll();
160                 if( task == null ) {
161                     // No task in backing - call the base class to wait for one to be offered.
162                     task = super.poll( waitTime, TimeUnit.MILLISECONDS );
163
164                     totalWaitTime -= POLL_WAIT_TIME_IN_MS;
165                     if( totalWaitTime <= 0 ) {
166                         break;
167                     }
168
169                     waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
170                 }
171             }
172
173             return task;
174         }
175
176         @Override
177         public Runnable poll() {
178             Runnable task = backingQueue.poll();
179             return task != null ? task : super.poll();
180         }
181     }
182
183     /**
184      * Internal RejectedExecutionHandler that tries to offer rejected tasks to the backing queue.
185      * If the queue is full, we throw a RejectedExecutionException by default. The client can
186      * override this behavior be specifying their own RejectedExecutionHandler, in which case we
187      * delegate to that handler.
188      */
189     private static class RejectedTaskHandler implements RejectedExecutionHandler {
190
191         private final LinkedBlockingQueue<Runnable> backingQueue;
192         private final AtomicLong largestBackingQueueSize;
193         private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
194
195         RejectedTaskHandler( LinkedBlockingQueue<Runnable> backingQueue,
196                              AtomicLong largestBackingQueueSize ) {
197             this.backingQueue = backingQueue;
198             this.largestBackingQueueSize = largestBackingQueueSize;
199         }
200
201         void setDelegateRejectedExecutionHandler(
202                 RejectedExecutionHandler delegateRejectedExecutionHandler ){
203             this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
204         }
205
206         @Override
207         public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
208             if( executor.isShutdown() ) {
209                 throw new RejectedExecutionException( "Executor has been shutdown." );
210             }
211
212             if( !backingQueue.offer( task ) ) {
213                 if( delegateRejectedExecutionHandler != null ) {
214                     delegateRejectedExecutionHandler.rejectedExecution( task, executor );
215                 } else {
216                     throw new RejectedExecutionException(
217                                                 "All threads are in use and the queue is full" );
218                 }
219             }
220
221             largestBackingQueueSize.incrementAndGet();
222             long size = backingQueue.size();
223             long largest = largestBackingQueueSize.get();
224             if( size > largest ) {
225                 largestBackingQueueSize.compareAndSet( largest, size );
226             }
227         }
228     }
229 }