From 8fdc09336d2df5712dbaafed4e43ac8ec657a824 Mon Sep 17 00:00:00 2001 From: Michael Vorburger Date: Thu, 28 Sep 2017 23:32:19 +0200 Subject: [PATCH] SpecialExecutors with LoggingThreadUncaughtExceptionHandler Change-Id: I133bbf474a60681f061968a8f9ed1ed32671fd6a Signed-off-by: Michael Vorburger --- common/util/pom.xml | 48 ++++++------ .../concurrent/CachedThreadPoolExecutor.java | 20 ++++- .../concurrent/FastThreadPoolExecutor.java | 39 +++++++++- ...LoggingThreadUncaughtExceptionHandler.java | 41 ++++++++++ .../util/concurrent/SpecialExecutors.java | 77 ++++++++++++++++--- .../concurrent/ThreadFactoryProvider.java | 68 ++++++++++++++++ 6 files changed, 253 insertions(+), 40 deletions(-) create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/LoggingThreadUncaughtExceptionHandler.java create mode 100644 common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ThreadFactoryProvider.java diff --git a/common/util/pom.xml b/common/util/pom.xml index 6f0259193f..3b2b1e4787 100644 --- a/common/util/pom.xml +++ b/common/util/pom.xml @@ -46,6 +46,10 @@ com.google.guava guava + + org.immutables + value + junit @@ -64,31 +68,31 @@ - - - - org.apache.maven.plugins - maven-checkstyle-plugin - - checkstyle.violationSeverity=error - - - - + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + checkstyle.violationSeverity=error + + + + - ${odl.site.url}/${project.groupId}/${stream}/${project.artifactId}/ - - - opendaylight-site - ${nexus.site.url}/${project.artifactId}/ - - + + + opendaylight-site + ${nexus.site.url}/${project.artifactId}/ + + diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java index 1419080509..cff95ff221 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java @@ -11,7 +11,6 @@ import static java.util.Objects.requireNonNull; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -19,6 +18,7 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.slf4j.LoggerFactory; /** * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously @@ -50,8 +50,11 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor { * the capacity of the queue. * @param threadPrefix * the name prefix for threads created by this executor. + * @param loggerIdentity + * the class to use as logger name for logging uncaught exceptions from the threads. */ - public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix) { + public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix, + Class loggerIdentity) { // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue. // We don't specify any core threads (first parameter) so, when a task is submitted, // the base class will always try to offer to the queue. If there is an existing waiting @@ -67,8 +70,8 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor { this.threadPrefix = requireNonNull(threadPrefix); this.maximumQueueSize = maximumQueueSize; - setThreadFactory(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat(this.threadPrefix + "-%d").build()); + setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix) + .logger(LoggerFactory.getLogger(loggerIdentity)).build().get()); executorQueue = (ExecutorQueue)super.getQueue(); @@ -77,6 +80,15 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor { super.setRejectedExecutionHandler(rejectedTaskHandler); } + /** + * Constructor. + * @deprecated Please use {@link #CachedThreadPoolExecutor(int, int, String, Class)} instead. + */ + @Deprecated + public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix) { + this(maximumPoolSize, maximumQueueSize, threadPrefix, CachedThreadPoolExecutor.class); + } + @Override public void setRejectedExecutionHandler(final RejectedExecutionHandler handler) { rejectedTaskHandler.setDelegateRejectedExecutionHandler(requireNonNull(handler)); diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java index f5e6a1803f..b317944b69 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java @@ -10,9 +10,9 @@ package org.opendaylight.yangtools.util.concurrent; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.slf4j.LoggerFactory; /** * A ThreadPoolExecutor with a specified bounded queue capacity that favors creating new threads @@ -39,12 +39,41 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor { * the capacity of the queue. * @param threadPrefix * the name prefix for threads created by this executor. + * @param loggerIdentity + * the class to use as logger name for logging uncaught exceptions from the threads. */ + public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix, + Class loggerIdentity) { + this(maximumPoolSize, maximumQueueSize, DEFAULT_IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS, + threadPrefix, loggerIdentity); + } + + /** + * Constructor. + * + * @deprecated Please use + * {@link #FastThreadPoolExecutor(int, int, String, Class)} + * instead. + */ + @Deprecated public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix) { this(maximumPoolSize, maximumQueueSize, DEFAULT_IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS, threadPrefix); } + /** + * Constructor. + * + * @deprecated Please use + * {@link #FastThreadPoolExecutor(int, int, long, TimeUnit, String, Class)} + * instead. + */ + @Deprecated + public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final long keepAliveTime, + final TimeUnit unit, final String threadPrefix) { + this(maximumPoolSize, maximumQueueSize, keepAliveTime, unit, threadPrefix, FastThreadPoolExecutor.class); + } + /** * Constructs a FastThreadPoolExecutor instance. * @@ -58,9 +87,11 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor { * the time unit for the keepAliveTime argument * @param threadPrefix * the name prefix for threads created by this executor. + * @param loggerIdentity + * the class to use as logger name for logging uncaught exceptions from the threads. */ public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final long keepAliveTime, - final TimeUnit unit, final String threadPrefix) { + final TimeUnit unit, final String threadPrefix, Class loggerIdentity) { // We use all core threads (the first 2 parameters below equal) so, when a task is submitted, // if the thread limit hasn't been reached, a new thread will be spawned to execute // the task even if there is an existing idle thread in the pool. This is faster than @@ -73,8 +104,8 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor { this.threadPrefix = threadPrefix; this.maximumQueueSize = maximumQueueSize; - setThreadFactory(new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat(threadPrefix + "-%d").build()); + setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix) + .logger(LoggerFactory.getLogger(loggerIdentity)).build().get()); if (keepAliveTime > 0) { // Need to specifically configure core threads to timeout. diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/LoggingThreadUncaughtExceptionHandler.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/LoggingThreadUncaughtExceptionHandler.java new file mode 100644 index 0000000000..6efbd640fb --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/LoggingThreadUncaughtExceptionHandler.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.yangtools.util.concurrent; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Objects; +import org.slf4j.Logger; + +/** + * Thread's UncaughtExceptionHandler which logs to slf4j. + * + * @author Michael Vorburger.ch + */ +public final class LoggingThreadUncaughtExceptionHandler implements UncaughtExceptionHandler { + + // This class is also available in infrautils (but yangtools cannot depend on infrautils) + // as org.opendaylight.infrautils.utils.concurrent.LoggingThreadUncaughtExceptionHandler + + /** + * Factory method to obtain an instance of this bound to the passed slf4j Logger. + */ + public static UncaughtExceptionHandler toLogger(Logger logger) { + return new LoggingThreadUncaughtExceptionHandler(logger); + } + + private final Logger logger; + + private LoggingThreadUncaughtExceptionHandler(Logger logger) { + this.logger = Objects.requireNonNull(logger, "logger"); + } + + @Override + public void uncaughtException(Thread thread, Throwable throwable) { + logger.error("Thread terminated due to uncaught exception: {}", thread.getName(), throwable); + } +} diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java index ae01b8c1d1..e6c2158ce2 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java @@ -46,11 +46,23 @@ public final class SpecialExecutors { * the capacity of the queue. * @param threadPrefix * the name prefix for threads created by this executor. + * @param loggerIdentity + * the class to use as logger name for logging uncaught exceptions from the threads. * @return a new ExecutorService with the specified configuration. */ + public static ExecutorService newBoundedFastThreadPool(int maximumPoolSize, + int maximumQueueSize, String threadPrefix, Class loggerIdentity) { + return new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix, loggerIdentity); + } + + /** + * Deprecated variant. + * @deprecated Please use {@link #newBoundedFastThreadPool(int, int, String, Class)} instead. + */ + @Deprecated public static ExecutorService newBoundedFastThreadPool(int maximumPoolSize, int maximumQueueSize, String threadPrefix) { - return new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix); + return newBoundedFastThreadPool(maximumPoolSize, maximumQueueSize, threadPrefix, SpecialExecutors.class); } /** @@ -66,17 +78,30 @@ public final class SpecialExecutors { * the capacity of the queue. * @param threadPrefix * the name prefix for threads created by this executor. + * @param loggerIdentity + * the class to use as logger name for logging uncaught exceptions from the threads. * @return a new ExecutorService with the specified configuration. */ public static ExecutorService newBlockingBoundedFastThreadPool(int maximumPoolSize, - int maximumQueueSize, String threadPrefix) { + int maximumQueueSize, String threadPrefix, Class loggerIdentity) { - FastThreadPoolExecutor executor = - new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix); + FastThreadPoolExecutor executor = new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix, + loggerIdentity); executor.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerRunsPolicy()); return executor; } + /** + * Deprecated variant. + * @deprecated Please use {@link #newBlockingBoundedFastThreadPool(int, int, String, Class)} instead. + */ + @Deprecated + public static ExecutorService newBlockingBoundedFastThreadPool(int maximumPoolSize, + int maximumQueueSize, String threadPrefix) { + return newBlockingBoundedFastThreadPool(maximumPoolSize, maximumQueueSize, threadPrefix, + SpecialExecutors.class); + } + /** * Creates an ExecutorService with a specified bounded queue capacity that favors reusing * previously constructed threads, when they are available, over creating new threads. When a @@ -104,9 +129,19 @@ public final class SpecialExecutors { * the name prefix for threads created by this executor. * @return a new ExecutorService with the specified configuration. */ + public static ExecutorService newBoundedCachedThreadPool(int maximumPoolSize, + int maximumQueueSize, String threadPrefix, Class loggerIdentity) { + return new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix, loggerIdentity); + } + + /** + * Deprecated variant. + * @deprecated Please use {@link #newBoundedCachedThreadPool(int, int, String, Class)} instead. + */ + @Deprecated public static ExecutorService newBoundedCachedThreadPool(int maximumPoolSize, int maximumQueueSize, String threadPrefix) { - return new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix); + return new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix, SpecialExecutors.class); } /** @@ -125,14 +160,25 @@ public final class SpecialExecutors { * @return a new ExecutorService with the specified configuration. */ public static ExecutorService newBlockingBoundedCachedThreadPool(int maximumPoolSize, - int maximumQueueSize, String threadPrefix) { + int maximumQueueSize, String threadPrefix, Class loggerIdentity) { - CachedThreadPoolExecutor executor = - new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix); + CachedThreadPoolExecutor executor = new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, + threadPrefix, loggerIdentity); executor.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerRunsPolicy()); return executor; } + /** + * Deprecated variant. + * @deprecated Please use {@link #newBlockingBoundedCachedThreadPool(int, int, String, Class)} instead. + */ + @Deprecated + public static ExecutorService newBlockingBoundedCachedThreadPool(int maximumPoolSize, int maximumQueueSize, + String threadPrefix) { + return newBlockingBoundedCachedThreadPool(maximumPoolSize, maximumQueueSize, threadPrefix, + SpecialExecutors.class); + } + /** * Creates an ExecutorService that uses a single worker thread operating off a bounded queue * with the specified capacity. Tasks are guaranteed to execute sequentially, and no more than @@ -143,11 +189,22 @@ public final class SpecialExecutors { * the capacity of the queue. * @param threadPrefix * the name prefix for the thread created by this executor. + * @param loggerIdentity + * the class to use as logger name for logging uncaught exceptions from the threads. * @return a new ExecutorService with the specified configuration. */ public static ExecutorService newBoundedSingleThreadExecutor(int maximumQueueSize, - String threadPrefix) { + String threadPrefix, Class loggerIdentity) { return new FastThreadPoolExecutor(1, maximumQueueSize, Long.MAX_VALUE, TimeUnit.SECONDS, - threadPrefix); + threadPrefix, loggerIdentity); + } + + /** + * Deprecated variant. + * @deprecated Please use {@link #newBoundedSingleThreadExecutor(int, String, Class)} instead. + */ + @Deprecated + public static ExecutorService newBoundedSingleThreadExecutor(int maximumQueueSize, String threadPrefix) { + return newBoundedSingleThreadExecutor(maximumQueueSize, threadPrefix, SpecialExecutors.class); } } diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ThreadFactoryProvider.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ThreadFactoryProvider.java new file mode 100644 index 0000000000..077e827254 --- /dev/null +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ThreadFactoryProvider.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.yangtools.util.concurrent; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.Optional; +import java.util.concurrent.ThreadFactory; +import org.immutables.value.Value; +import org.slf4j.Logger; + +/** + * Builder for {@link ThreadFactory}. Easier to use than the + * {@link ThreadFactoryBuilder}, because it enforces setting all required + * properties through a staged builder. + * + * @author Michael Vorburger.ch + */ +@Value.Immutable +@Value.Style(stagedBuilder = true) +public abstract class ThreadFactoryProvider { + + // This class is also available in infrautils (but yangtools cannot depend on infrautils) + // as org.opendaylight.infrautils.utils.concurrent.ThreadFactoryProvider + + public static ImmutableThreadFactoryProvider.NamePrefixBuildStage builder() { + return ImmutableThreadFactoryProvider.builder(); + } + + /** + * Prefix for threads from this factory. For example, "rpc-pool", to create + * "rpc-pool-1/2/3" named threads. Note that this is a prefix, not a format, + * so you pass just "rpc-pool" instead of e.g. "rpc-pool-%d". + */ + @Value.Parameter public abstract String namePrefix(); + + /** + * Logger used to log uncaught exceptions from new threads created via this factory. + */ + @Value.Parameter public abstract Logger logger(); + + /** + * Priority for new threads from this factory. + */ + @Value.Parameter public abstract Optional priority(); + + /** + * Daemon or not for new threads created via this factory. + * NB: Defaults to true. + */ + @Value.Default public boolean daemon() { + return true; + } + + public ThreadFactory get() { + ThreadFactoryBuilder guavaBuilder = new ThreadFactoryBuilder(); + guavaBuilder.setNameFormat(namePrefix() + "-%d"); + guavaBuilder.setUncaughtExceptionHandler(LoggingThreadUncaughtExceptionHandler.toLogger(logger())); + guavaBuilder.setDaemon(daemon()); + priority().ifPresent(guavaBuilder::setPriority); + logger().info("ThreadFactory created: {}", namePrefix()); + return guavaBuilder.build(); + } +} -- 2.36.6