2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import com.google.common.base.MoreObjects;
11 import com.google.common.base.MoreObjects.ToStringHelper;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.ThreadFactoryBuilder;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.LinkedBlockingQueue;
16 import java.util.concurrent.RejectedExecutionException;
17 import java.util.concurrent.RejectedExecutionHandler;
18 import java.util.concurrent.SynchronousQueue;
19 import java.util.concurrent.ThreadPoolExecutor;
20 import java.util.concurrent.TimeUnit;
23 * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
24 * constructed threads, when they are available, over creating new threads.
26 * See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
28 * @author Thomas Pantelis
30 public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
32 private static final long IDLE_TIMEOUT_IN_SEC = 60L;
34 private final ExecutorQueue executorQueue;
36 private final String threadPrefix;
38 private final int maximumQueueSize;
40 private final RejectedTaskHandler rejectedTaskHandler;
43 * Constructs an instance.
45 * @param maximumPoolSize
46 * the maximum number of threads to allow in the pool. Threads will terminate after
47 * being idle for 60 seconds.
48 * @param maximumQueueSize
49 * the capacity of the queue.
51 * the name prefix for threads created by this executor.
53 public CachedThreadPoolExecutor( final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix ) {
54 // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
55 // We don't specify any core threads (first parameter) so, when a task is submitted,
56 // the base class will always try to offer to the queue. If there is an existing waiting
57 // thread, the offer will succeed and the task will be handed to the thread to execute. If
58 // there's no waiting thread, either because there are no threads in the pool or all threads
59 // are busy, the base class will try to create a new thread. If the maximum thread limit has
60 // been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
61 // to offer to the backing queue. If that succeeds, the task will execute as soon as a
62 // thread becomes available. If the offer fails to the backing queue, the task is rejected.
63 super( 0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
64 new ExecutorQueue( maximumQueueSize ) );
66 this.threadPrefix = Preconditions.checkNotNull( threadPrefix );
67 this.maximumQueueSize = maximumQueueSize;
69 setThreadFactory( new ThreadFactoryBuilder().setDaemon( true )
70 .setNameFormat( this.threadPrefix + "-%d" ).build() );
72 executorQueue = (ExecutorQueue)super.getQueue();
74 rejectedTaskHandler = new RejectedTaskHandler(
75 executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy() );
76 super.setRejectedExecutionHandler( rejectedTaskHandler );
80 public void setRejectedExecutionHandler( final RejectedExecutionHandler handler ) {
81 Preconditions.checkNotNull( handler );
82 rejectedTaskHandler.setDelegateRejectedExecutionHandler( handler );
86 public RejectedExecutionHandler getRejectedExecutionHandler() {
87 return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
91 public BlockingQueue<Runnable> getQueue() {
92 return executorQueue.getBackingQueue();
95 public long getLargestQueueSize() {
96 return ((TrackingLinkedBlockingQueue<?>)executorQueue.getBackingQueue()).getLargestQueueSize();
99 protected ToStringHelper addToStringAttributes( final ToStringHelper toStringHelper ) {
100 return toStringHelper;
104 public final String toString() {
105 return addToStringAttributes( MoreObjects.toStringHelper( this )
106 .add( "Thread Prefix", threadPrefix )
107 .add( "Current Thread Pool Size", getPoolSize() )
108 .add( "Largest Thread Pool Size", getLargestPoolSize() )
109 .add( "Max Thread Pool Size", getMaximumPoolSize() )
110 .add( "Current Queue Size", executorQueue.getBackingQueue().size() )
111 .add( "Largest Queue Size", getLargestQueueSize() )
112 .add( "Max Queue Size", maximumQueueSize )
113 .add( "Active Thread Count", getActiveCount() )
114 .add( "Completed Task Count", getCompletedTaskCount() )
115 .add( "Total Task Count", getTaskCount() ) ).toString();
119 * A customized SynchronousQueue that has a backing bounded LinkedBlockingQueue. This class
120 * overrides the #poll methods to first try to poll the backing queue for a task. If the backing
121 * queue is empty, it calls the base SynchronousQueue#poll method. In this manner, we get the
122 * thread reuse behavior of the SynchronousQueue with the added ability to queue tasks when all
125 private static class ExecutorQueue extends SynchronousQueue<Runnable> {
127 private static final long serialVersionUID = 1L;
129 private static final long POLL_WAIT_TIME_IN_MS = 300;
131 private final LinkedBlockingQueue<Runnable> backingQueue;
133 ExecutorQueue( final int maxBackingQueueSize ) {
134 backingQueue = new TrackingLinkedBlockingQueue<>( maxBackingQueueSize );
137 LinkedBlockingQueue<Runnable> getBackingQueue() {
142 public Runnable poll( final long timeout, final TimeUnit unit ) throws InterruptedException {
143 long totalWaitTime = unit.toMillis( timeout );
144 long waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
145 Runnable task = null;
147 // We loop here, each time polling the backingQueue first then our queue, instead of
148 // polling each just once. This is to handle the following timing edge case:
150 // We poll the backingQueue and it's empty but, before the call to super.poll,
151 // a task is offered but no thread is immediately available and the task is put on the
152 // backingQueue. There is a slight chance that all the other threads could be at the
153 // same point, in which case they would all call super.poll and wait. If we only
154 // called poll once, no thread would execute the task (unless/until another task was
155 // later submitted). But by looping and breaking the specified timeout into small
156 // periods, one thread will eventually wake up and get the task from the backingQueue
157 // and execute it, although slightly delayed.
159 while (task == null) {
160 // First try to get a task from the backing queue.
161 task = backingQueue.poll();
163 // No task in backing - call the base class to wait for one to be offered.
164 task = super.poll( waitTime, TimeUnit.MILLISECONDS );
166 totalWaitTime -= POLL_WAIT_TIME_IN_MS;
167 if (totalWaitTime <= 0 ) {
171 waitTime = Math.min( totalWaitTime, POLL_WAIT_TIME_IN_MS );
179 public Runnable poll() {
180 Runnable task = backingQueue.poll();
181 return task != null ? task : super.poll();
186 * Internal RejectedExecutionHandler that tries to offer rejected tasks to the backing queue.
187 * If the queue is full, we throw a RejectedExecutionException by default. The client can
188 * override this behavior be specifying their own RejectedExecutionHandler, in which case we
189 * delegate to that handler.
191 private static class RejectedTaskHandler implements RejectedExecutionHandler {
193 private final LinkedBlockingQueue<Runnable> backingQueue;
194 private volatile RejectedExecutionHandler delegateRejectedExecutionHandler;
196 RejectedTaskHandler( final LinkedBlockingQueue<Runnable> backingQueue,
197 final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
198 this.backingQueue = backingQueue;
199 this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
202 void setDelegateRejectedExecutionHandler(
203 final RejectedExecutionHandler delegateRejectedExecutionHandler ) {
204 this.delegateRejectedExecutionHandler = delegateRejectedExecutionHandler;
207 RejectedExecutionHandler getDelegateRejectedExecutionHandler(){
208 return delegateRejectedExecutionHandler;
212 public void rejectedExecution( final Runnable task, final ThreadPoolExecutor executor ) {
213 if (executor.isShutdown()) {
214 throw new RejectedExecutionException( "Executor has been shutdown." );
217 if (!backingQueue.offer(task)) {
218 delegateRejectedExecutionHandler.rejectedExecution( task, executor );