/* * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.yangtools.util.concurrent; import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.slf4j.LoggerFactory; /** * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously * constructed threads, when they are available, over creating new threads. * *
See {@link SpecialExecutors#newBoundedCachedThreadPool} for more details.
*
* @author Thomas Pantelis
*/
public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
private static final long IDLE_TIMEOUT_IN_SEC = 60L;
private final ExecutorQueue executorQueue;
private final String threadPrefix;
private final int maximumQueueSize;
private final RejectedTaskHandler rejectedTaskHandler;
/**
* Constructs an instance.
*
* @param maximumPoolSize
* the maximum number of threads to allow in the pool. Threads will terminate after
* being idle for 60 seconds.
* @param maximumQueueSize
* the capacity of the queue.
* @param threadPrefix
* the name prefix for threads created by this executor.
* @param loggerIdentity
* the class to use as logger name for logging uncaught exceptions from the threads.
*/
// due to loggerIdentity argument usage
@SuppressWarnings("checkstyle:LoggerFactoryClassParameter")
public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix,
final Class> loggerIdentity) {
// We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
// We don't specify any core threads (first parameter) so, when a task is submitted,
// the base class will always try to offer to the queue. If there is an existing waiting
// thread, the offer will succeed and the task will be handed to the thread to execute. If
// there's no waiting thread, either because there are no threads in the pool or all threads
// are busy, the base class will try to create a new thread. If the maximum thread limit has
// been reached, the task will be rejected. We specify a RejectedTaskHandler that tries
// to offer to the backing queue. If that succeeds, the task will execute as soon as a
// thread becomes available. If the offer fails to the backing queue, the task is rejected.
super(0, maximumPoolSize, IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
new ExecutorQueue(maximumQueueSize));
this.threadPrefix = requireNonNull(threadPrefix);
this.maximumQueueSize = maximumQueueSize;
setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix)
.logger(LoggerFactory.getLogger(loggerIdentity)).build().get());
executorQueue = (ExecutorQueue)super.getQueue();
rejectedTaskHandler = new RejectedTaskHandler(
executorQueue.getBackingQueue(), CountingRejectedExecutionHandler.newAbortPolicy());
super.setRejectedExecutionHandler(rejectedTaskHandler);
}
@Override
public void setRejectedExecutionHandler(final RejectedExecutionHandler handler) {
rejectedTaskHandler.setDelegateRejectedExecutionHandler(requireNonNull(handler));
}
@Override
public RejectedExecutionHandler getRejectedExecutionHandler() {
return rejectedTaskHandler.getDelegateRejectedExecutionHandler();
}
@Override
public BlockingQueue