*/
package org.opendaylight.yangtools.util;
+import com.google.common.util.concurrent.ForwardingBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
-
+import java.util.concurrent.TimeUnit;
+import org.eclipse.jdt.annotation.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.ForwardingBlockingQueue;
-
/**
* Utility methods for dealing with {@link ExecutorService}s.
*/
public final class ExecutorServiceUtil {
private static final class WaitInQueueExecutionHandler implements RejectedExecutionHandler {
@Override
+ @SuppressWarnings("checkstyle:parameterName")
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
+ if (executor.isShutdown()) {
+ throw new RejectedExecutionException("Executor has been shutdown.");
+ }
+
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
- LOG.debug("Intterupted while waiting for queue", e);
- throw new RejectedExecutionException("Interrupted while waiting for queue", e);
+ LOG.debug("Interrupted while attempting to put to the queue", e);
+ throw new RejectedExecutionException("Interrupted while attempting to put to the queue", e);
}
}
}
private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceUtil.class);
- private static final RejectedExecutionHandler WAIT_IN_QUEUE_HANDLER = new WaitInQueueExecutionHandler();
+ private static final @NonNull RejectedExecutionHandler WAIT_IN_QUEUE_HANDLER = new WaitInQueueExecutionHandler();
private ExecutorServiceUtil() {
- throw new UnsupportedOperationException("Utility class");
+ // Hidden on purpose
}
/**
- * Create a {@link BlockingQueue} which does not allow for non-blocking addition to the queue.
+ * Creates a {@link BlockingQueue} which does not allow for non-blocking addition to the queue.
* This is useful with {@link #waitInQueueExecutionHandler()} to turn force a
* {@link ThreadPoolExecutor} to create as many threads as it is configured to before starting
* to fill the queue.
* @param delegate Backing blocking queue.
* @return A new blocking queue backed by the delegate
*/
- public <E> BlockingQueue<E> offerFailingBlockingQueue(final BlockingQueue<E> delegate) {
- return new ForwardingBlockingQueue<E>() {
+ public static <E> @NonNull BlockingQueue<E> offerFailingBlockingQueue(final BlockingQueue<E> delegate) {
+ return new ForwardingBlockingQueue<>() {
@Override
+ @SuppressWarnings("checkstyle:parameterName")
public boolean offer(final E o) {
return false;
}
}
/**
- * Return a {@link RejectedExecutionHandler} which blocks on the {@link ThreadPoolExecutor}'s
+ * Returns a {@link RejectedExecutionHandler} which blocks on the {@link ThreadPoolExecutor}'s
* backing queue if a new thread cannot be spawned.
*
* @return A shared RejectedExecutionHandler instance.
*/
- public RejectedExecutionHandler waitInQueueExecutionHandler() {
+ public static @NonNull RejectedExecutionHandler waitInQueueExecutionHandler() {
return WAIT_IN_QUEUE_HANDLER;
}
+
+ /**
+ * Tries to shutdown the given executor gracefully by awaiting termination for the given
+ * timeout period. If the timeout elapses before termination, the executor is forcefully
+ * shutdown.
+ */
+ public static void tryGracefulShutdown(final @NonNull ExecutorService executor, final long timeout,
+ final @NonNull TimeUnit unit) {
+ executor.shutdown();
+
+ try {
+ if (!executor.awaitTermination(timeout, unit)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ }
+ }
}