2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import java.util.concurrent.BlockingQueue;
11 import java.util.concurrent.LinkedBlockingQueue;
12 import java.util.concurrent.RejectedExecutionException;
13 import java.util.concurrent.RejectedExecutionHandler;
14 import java.util.concurrent.SynchronousQueue;
15 import java.util.concurrent.ThreadPoolExecutor;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.atomic.AtomicLong;
19 import com.google.common.base.Objects;
20 import com.google.common.base.Objects.ToStringHelper;
21 import com.google.common.base.Preconditions;
22 import com.google.common.util.concurrent.ThreadFactoryBuilder;
25 * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
26 * constructed threads, when they are available, over creating new threads.
28 * See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
30 * @author Thomas Pantelis
32 public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
34 private static final long IDLE_TIMEOUT_IN_SEC = 60L;
36 private final AtomicLong largestBackingQueueSize = new AtomicLong( 0 );
38 private final ExecutorQueue executorQueue;
40 private final String threadPrefix;
42 private final int maximumQueueSize;
44 private final RejectedTaskHandler rejectedTaskHandler;
47 * Constructs an instance.
49 * @param maximumPoolSize
50 * the maximum number of threads to allow in the pool. Threads will terminate after
51 * being idle for 60 seconds.
52 * @param maximumQueueSize
53 * the capacity of the queue.
55 * the name prefix for threads created by this executor.
57 public CachedThreadPoolExecutor( int maximumPoolSize, int maximumQueueSize, String threadPrefix ) {
58 // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
59 // We don't specify any core threads (first parameter) so, when a task is submitted,
60 // the base class will always try to offer to the queue. If there is an existing waiting
61 // thread, the offer will succeed and the task will be handed to the thread to execute. If
62 // there's no waiting thread, either because there are no threads in the pool or all threads
63 // are busy, the base class will try to create a new thread. If the maximum thread limit has
64 // been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
65 // to offer to the backing queue. If that succeeds, the task will execute as soon as a
66 // thread becomes available. If the offer fails to the backing queue, the task is rejected.
67 super( 0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
68 new ExecutorQueue( maximumQueueSize ) );
70 this.threadPrefix = Preconditions.checkNotNull( threadPrefix );
71 this.maximumQueueSize = maximumQueueSize;
73 setThreadFactory( new ThreadFactoryBuilder().setDaemon( true )
74 .setNameFormat( this.threadPrefix + "-%d" ).build() );
76 executorQueue = (ExecutorQueue)super.getQueue();
78 rejectedTaskHandler = new RejectedTaskHandler(
79 executorQueue.getBackingQueue(), largestBackingQueueSize );
80 super.setRejectedExecutionHandler( rejectedTaskHandler );
84 public void setRejectedExecutionHandler( RejectedExecutionHandler handler ) {
85 rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
89 public BlockingQueue<Runnable> getQueue(){
90 return executorQueue.getBackingQueue();
93 public long getLargestQueueSize() {
94 return largestBackingQueueSize.get();
97 protected ToStringHelper addToStringAttributes( ToStringHelper toStringHelper ) {
98 return toStringHelper;
102 public final String toString() {
103 return addToStringAttributes( Objects.toStringHelper( this )
104 .add( "Thread Prefix", threadPrefix )
105 .add( "Current Thread Pool Size", getPoolSize() )
106 .add( "Largest Thread Pool Size", getLargestPoolSize() )
107 .add( "Max Thread Pool Size", getMaximumPoolSize() )
108 .add( "Current Queue Size", executorQueue.getBackingQueue().size() )
109 .add( "Largest Queue Size", getLargestQueueSize() )
110 .add( "Max Queue Size", maximumQueueSize )
111 .add( "Active Thread Count", getActiveCount() )
112 .add( "Completed Task Count", getCompletedTaskCount() )
113 .add( "Total Task Count", getTaskCount() ) ).toString();
117 * A customized SynchronousQueue that has a backing bounded LinkedBlockingQueue. This class
118 * overrides the #poll methods to first try to poll the backing queue for a task. If the backing
119 * queue is empty, it calls the base SynchronousQueue#poll method. In this manner, we get the
120 * thread reuse behavior of the SynchronousQueue with the added ability to queue tasks when all
123 private static class ExecutorQueue extends SynchronousQueue<Runnable> {
125 private static final long serialVersionUID = 1L;
127 private static final long POLL_WAIT_TIME_IN_MS = 300;
129 private final LinkedBlockingQueue<Runnable> backingQueue;
131 ExecutorQueue( int maxBackingQueueSize ) {
132 backingQueue = new LinkedBlockingQueue<>( maxBackingQueueSize );
135 LinkedBlockingQueue<Runnable> getBackingQueue() {
140 public Runnable poll( long timeout, TimeUnit unit ) throws InterruptedException {
141 long totalWaitTime = unit.toMillis( timeout );
142 long waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
143 Runnable task = null;
145 // We loop here, each time polling the backingQueue first then our queue, instead of
146 // polling each just once. This is to handle the following timing edge case:
148 // We poll the backingQueue and it's empty but, before the call to super.poll,
149 // a task is offered but no thread is immediately available and the task is put on the
150 // backingQueue. There is a slight chance that all the other threads could be at the
151 // same point, in which case they would all call super.poll and wait. If we only
152 // called poll once, no thread would execute the task (unless/until another task was
153 // later submitted). But by looping and breaking the specified timeout into small
154 // periods, one thread will eventually wake up and get the task from the backingQueue
155 // and execute it, although slightly delayed.
157 while( task == null ) {
158 // First try to get a task from the backing queue.
159 task = backingQueue.poll();
161 // No task in backing - call the base class to wait for one to be offered.
162 task = super.poll( waitTime, TimeUnit.MILLISECONDS );
164 totalWaitTime -= POLL_WAIT_TIME_IN_MS;
165 if( totalWaitTime <= 0 ) {
169 waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
177 public Runnable poll() {
178 Runnable task = backingQueue.poll();
179 return task != null ? task : super.poll();
184 * Internal RejectedExecutionHandler that tries to offer rejected tasks to the backing queue.
185 * If the queue is full, we throw a RejectedExecutionException by default. The client can
186 * override this behavior be specifying their own RejectedExecutionHandler, in which case we
187 * delegate to that handler.
189 private static class RejectedTaskHandler implements RejectedExecutionHandler {
191 private final LinkedBlockingQueue<Runnable> backingQueue;
192 private final AtomicLong largestBackingQueueSize;
193 private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
195 RejectedTaskHandler( LinkedBlockingQueue<Runnable> backingQueue,
196 AtomicLong largestBackingQueueSize ) {
197 this.backingQueue = backingQueue;
198 this.largestBackingQueueSize = largestBackingQueueSize;
201 void setDelegateRejectedExecutionHandler(
202 RejectedExecutionHandler delegateRejectedExecutionHandler ){
203 this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
207 public void rejectedExecution( Runnable task, ThreadPoolExecutor executor ) {
208 if( executor.isShutdown() ) {
209 throw new RejectedExecutionException( "Executor has been shutdown." );
212 if( !backingQueue.offer( task ) ) {
213 if( delegateRejectedExecutionHandler != null ) {
214 delegateRejectedExecutionHandler.rejectedExecution( task, executor );
216 throw new RejectedExecutionException(
217 "All threads are in use and the queue is full" );
221 largestBackingQueueSize.incrementAndGet();
222 long size = backingQueue.size();
223 long largest = largestBackingQueueSize.get();
224 if( size > largest ) {
225 largestBackingQueueSize.compareAndSet( largest, size );