SpecialExecutors with LoggingThreadUncaughtExceptionHandler 05/63805/7
authorMichael Vorburger <vorburger@redhat.com>
Thu, 28 Sep 2017 21:32:19 +0000 (23:32 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 19 Oct 2017 08:27:38 +0000 (08:27 +0000)
Change-Id: I133bbf474a60681f061968a8f9ed1ed32671fd6a
Signed-off-by: Michael Vorburger <vorburger@redhat.com>
common/util/pom.xml
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/CachedThreadPoolExecutor.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/FastThreadPoolExecutor.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/LoggingThreadUncaughtExceptionHandler.java [new file with mode: 0644]
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/SpecialExecutors.java
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ThreadFactoryProvider.java [new file with mode: 0644]

index 6f0259193ff8be832027d65f0ed5330189d4c5f7..3b2b1e4787cc35045147227efa15c40aa0a5868b 100644 (file)
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.immutables</groupId>
+            <artifactId>value</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>junit</groupId>
         </dependency>
     </dependencies>
 
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-checkstyle-plugin</artifactId>
-                               <configuration>
-                                       <propertyExpansion>checkstyle.violationSeverity=error</propertyExpansion>
-                               </configuration>
-                       </plugin>
-               </plugins>
-       </build>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <configuration>
+                    <propertyExpansion>checkstyle.violationSeverity=error</propertyExpansion>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 
-       <!--
-         Maven Site Configuration
+    <!--
+      Maven Site Configuration
 
-         The following configuration is necessary for maven-site-plugin to
-         correctly identify the correct deployment path for OpenDaylight Maven
-         sites.
+      The following configuration is necessary for maven-site-plugin to
+      correctly identify the correct deployment path for OpenDaylight Maven
+      sites.
       -->
     <url>${odl.site.url}/${project.groupId}/${stream}/${project.artifactId}/</url>
 
-       <distributionManagement>
-               <site>
-                       <id>opendaylight-site</id>
-                       <url>${nexus.site.url}/${project.artifactId}/</url>
-               </site>
-       </distributionManagement>
+    <distributionManagement>
+        <site>
+            <id>opendaylight-site</id>
+            <url>${nexus.site.url}/${project.artifactId}/</url>
+        </site>
+    </distributionManagement>
 </project>
index 141908050971cb9ea953d0e4935a2e272268dc3e..cff95ff221075f23920ca1004a09e954e2852dce 100644 (file)
@@ -11,7 +11,6 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -19,6 +18,7 @@ import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.slf4j.LoggerFactory;
 
 /**
  * A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
@@ -50,8 +50,11 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
      *            the capacity of the queue.
      * @param threadPrefix
      *            the name prefix for threads created by this executor.
+     * @param loggerIdentity
+     *               the class to use as logger name for logging uncaught exceptions from the threads.
      */
-    public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix) {
+    public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix,
+            Class<?> loggerIdentity) {
         // We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
         // We don't specify any core threads (first parameter) so, when a task is submitted,
         // the base class will always try to offer to the queue. If there is an existing waiting
@@ -67,8 +70,8 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         this.threadPrefix = requireNonNull(threadPrefix);
         this.maximumQueueSize = maximumQueueSize;
 
-        setThreadFactory(new ThreadFactoryBuilder().setDaemon(true)
-                                            .setNameFormat(this.threadPrefix + "-%d").build());
+        setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix)
+                .logger(LoggerFactory.getLogger(loggerIdentity)).build().get());
 
         executorQueue = (ExecutorQueue)super.getQueue();
 
@@ -77,6 +80,15 @@ public class CachedThreadPoolExecutor extends ThreadPoolExecutor {
         super.setRejectedExecutionHandler(rejectedTaskHandler);
     }
 
+    /**
+     * Constructor.
+     * @deprecated Please use {@link #CachedThreadPoolExecutor(int, int, String, Class)} instead.
+     */
+    @Deprecated
+    public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix) {
+        this(maximumPoolSize, maximumQueueSize, threadPrefix, CachedThreadPoolExecutor.class);
+    }
+
     @Override
     public void setRejectedExecutionHandler(final RejectedExecutionHandler handler) {
         rejectedTaskHandler.setDelegateRejectedExecutionHandler(requireNonNull(handler));
index f5e6a1803f67e6093fc87ed441bbfda17a596b89..b317944b69a220d54be3ed1137911b8c45cad7a3 100644 (file)
@@ -10,9 +10,9 @@ package org.opendaylight.yangtools.util.concurrent;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.slf4j.LoggerFactory;
 
 /**
  * A ThreadPoolExecutor with a specified bounded queue capacity that favors creating new threads
@@ -39,12 +39,41 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor {
      *            the capacity of the queue.
      * @param threadPrefix
      *            the name prefix for threads created by this executor.
+     * @param loggerIdentity
+     *               the class to use as logger name for logging uncaught exceptions from the threads.
      */
+    public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix,
+            Class<?> loggerIdentity) {
+        this(maximumPoolSize, maximumQueueSize, DEFAULT_IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
+              threadPrefix, loggerIdentity);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @deprecated Please use
+     *             {@link #FastThreadPoolExecutor(int, int, String, Class)}
+     *             instead.
+     */
+    @Deprecated
     public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix) {
         this(maximumPoolSize, maximumQueueSize, DEFAULT_IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
               threadPrefix);
     }
 
+    /**
+     * Constructor.
+     *
+     * @deprecated Please use
+     *             {@link #FastThreadPoolExecutor(int, int, long, TimeUnit, String, Class)}
+     *             instead.
+     */
+    @Deprecated
+    public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final long keepAliveTime,
+            final TimeUnit unit, final String threadPrefix) {
+        this(maximumPoolSize, maximumQueueSize, keepAliveTime, unit, threadPrefix, FastThreadPoolExecutor.class);
+    }
+
     /**
      * Constructs a FastThreadPoolExecutor instance.
      *
@@ -58,9 +87,11 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor {
      *            the time unit for the keepAliveTime argument
      * @param threadPrefix
      *            the name prefix for threads created by this executor.
+     * @param loggerIdentity
+     *               the class to use as logger name for logging uncaught exceptions from the threads.
      */
     public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final long keepAliveTime,
-            final TimeUnit unit, final String threadPrefix) {
+            final TimeUnit unit, final String threadPrefix, Class<?> loggerIdentity) {
         // We use all core threads (the first 2 parameters below equal) so, when a task is submitted,
         // if the thread limit hasn't been reached, a new thread will be spawned to execute
         // the task even if there is an existing idle thread in the pool. This is faster than
@@ -73,8 +104,8 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutor {
         this.threadPrefix = threadPrefix;
         this.maximumQueueSize = maximumQueueSize;
 
-        setThreadFactory(new ThreadFactoryBuilder().setDaemon(true)
-                                                 .setNameFormat(threadPrefix + "-%d").build());
+        setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix)
+                .logger(LoggerFactory.getLogger(loggerIdentity)).build().get());
 
         if (keepAliveTime > 0) {
             // Need to specifically configure core threads to timeout.
diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/LoggingThreadUncaughtExceptionHandler.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/LoggingThreadUncaughtExceptionHandler.java
new file mode 100644 (file)
index 0000000..6efbd64
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Objects;
+import org.slf4j.Logger;
+
+/**
+ * Thread's UncaughtExceptionHandler which logs to slf4j.
+ *
+ * @author Michael Vorburger.ch
+ */
+public final class LoggingThreadUncaughtExceptionHandler implements UncaughtExceptionHandler {
+
+    // This class is also available in infrautils (but yangtools cannot depend on infrautils)
+    // as org.opendaylight.infrautils.utils.concurrent.LoggingThreadUncaughtExceptionHandler
+
+    /**
+     * Factory method to obtain an instance of this bound to the passed slf4j Logger.
+     */
+    public static UncaughtExceptionHandler toLogger(Logger logger) {
+        return new LoggingThreadUncaughtExceptionHandler(logger);
+    }
+
+    private final Logger logger;
+
+    private LoggingThreadUncaughtExceptionHandler(Logger logger) {
+        this.logger = Objects.requireNonNull(logger, "logger");
+    }
+
+    @Override
+    public void uncaughtException(Thread thread, Throwable throwable) {
+        logger.error("Thread terminated due to uncaught exception: {}", thread.getName(), throwable);
+    }
+}
index ae01b8c1d101d852efe51901b846f59625148c9e..e6c2158ce26ac0732f6d1ef3d746386a3a19cfd2 100644 (file)
@@ -46,11 +46,23 @@ public final class SpecialExecutors {
      *            the capacity of the queue.
      * @param threadPrefix
      *            the name prefix for threads created by this executor.
+     * @param loggerIdentity
+     *               the class to use as logger name for logging uncaught exceptions from the threads.
      * @return a new ExecutorService with the specified configuration.
      */
+    public static ExecutorService newBoundedFastThreadPool(int maximumPoolSize,
+            int maximumQueueSize, String threadPrefix, Class<?> loggerIdentity) {
+        return new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix, loggerIdentity);
+    }
+
+    /**
+     * Deprecated variant.
+     * @deprecated Please use {@link #newBoundedFastThreadPool(int, int, String, Class)} instead.
+     */
+    @Deprecated
     public static ExecutorService newBoundedFastThreadPool(int maximumPoolSize,
             int maximumQueueSize, String threadPrefix) {
-        return new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix);
+        return newBoundedFastThreadPool(maximumPoolSize, maximumQueueSize, threadPrefix, SpecialExecutors.class);
     }
 
     /**
@@ -66,17 +78,30 @@ public final class SpecialExecutors {
      *            the capacity of the queue.
      * @param threadPrefix
      *            the name prefix for threads created by this executor.
+     * @param loggerIdentity
+     *               the class to use as logger name for logging uncaught exceptions from the threads.
      * @return a new ExecutorService with the specified configuration.
      */
     public static ExecutorService newBlockingBoundedFastThreadPool(int maximumPoolSize,
-            int maximumQueueSize, String threadPrefix) {
+            int maximumQueueSize, String threadPrefix, Class<?> loggerIdentity) {
 
-        FastThreadPoolExecutor executor =
-                new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix);
+        FastThreadPoolExecutor executor = new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix,
+                loggerIdentity);
         executor.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerRunsPolicy());
         return executor;
     }
 
+    /**
+     * Deprecated variant.
+     * @deprecated Please use {@link #newBlockingBoundedFastThreadPool(int, int, String, Class)} instead.
+     */
+    @Deprecated
+    public static ExecutorService newBlockingBoundedFastThreadPool(int maximumPoolSize,
+            int maximumQueueSize, String threadPrefix) {
+        return newBlockingBoundedFastThreadPool(maximumPoolSize, maximumQueueSize, threadPrefix,
+                SpecialExecutors.class);
+    }
+
     /**
      * Creates an ExecutorService with a specified bounded queue capacity that favors reusing
      * previously constructed threads, when they are available, over creating new threads. When a
@@ -104,9 +129,19 @@ public final class SpecialExecutors {
      *            the name prefix for threads created by this executor.
      * @return a new ExecutorService with the specified configuration.
      */
+    public static ExecutorService newBoundedCachedThreadPool(int maximumPoolSize,
+            int maximumQueueSize, String threadPrefix, Class<?> loggerIdentity) {
+        return new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix, loggerIdentity);
+    }
+
+    /**
+     * Deprecated variant.
+     * @deprecated Please use {@link #newBoundedCachedThreadPool(int, int, String, Class)} instead.
+     */
+    @Deprecated
     public static ExecutorService newBoundedCachedThreadPool(int maximumPoolSize,
             int maximumQueueSize, String threadPrefix) {
-        return new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix);
+        return new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix, SpecialExecutors.class);
     }
 
     /**
@@ -125,14 +160,25 @@ public final class SpecialExecutors {
      * @return a new ExecutorService with the specified configuration.
      */
     public static ExecutorService newBlockingBoundedCachedThreadPool(int maximumPoolSize,
-            int maximumQueueSize, String threadPrefix) {
+            int maximumQueueSize, String threadPrefix, Class<?> loggerIdentity) {
 
-        CachedThreadPoolExecutor executor =
-                new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix);
+        CachedThreadPoolExecutor executor = new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize,
+                threadPrefix, loggerIdentity);
         executor.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerRunsPolicy());
         return executor;
     }
 
+    /**
+     * Deprecated variant.
+     * @deprecated Please use {@link #newBlockingBoundedCachedThreadPool(int, int, String, Class)} instead.
+     */
+    @Deprecated
+    public static ExecutorService newBlockingBoundedCachedThreadPool(int maximumPoolSize, int maximumQueueSize,
+            String threadPrefix) {
+        return newBlockingBoundedCachedThreadPool(maximumPoolSize, maximumQueueSize, threadPrefix,
+                SpecialExecutors.class);
+    }
+
     /**
      * Creates an ExecutorService that uses a single worker thread operating off a bounded queue
      * with the specified capacity. Tasks are guaranteed to execute sequentially, and no more than
@@ -143,11 +189,22 @@ public final class SpecialExecutors {
      *            the capacity of the queue.
      * @param threadPrefix
      *            the name prefix for the thread created by this executor.
+     * @param loggerIdentity
+     *               the class to use as logger name for logging uncaught exceptions from the threads.
      * @return a new ExecutorService with the specified configuration.
      */
     public static ExecutorService newBoundedSingleThreadExecutor(int maximumQueueSize,
-            String threadPrefix) {
+            String threadPrefix, Class<?> loggerIdentity) {
         return new FastThreadPoolExecutor(1, maximumQueueSize, Long.MAX_VALUE, TimeUnit.SECONDS,
-                threadPrefix);
+                threadPrefix, loggerIdentity);
+    }
+
+    /**
+     * Deprecated variant.
+     * @deprecated Please use {@link #newBoundedSingleThreadExecutor(int, String, Class)} instead.
+     */
+    @Deprecated
+    public static ExecutorService newBoundedSingleThreadExecutor(int maximumQueueSize, String threadPrefix) {
+        return newBoundedSingleThreadExecutor(maximumQueueSize, threadPrefix, SpecialExecutors.class);
     }
 }
diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ThreadFactoryProvider.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/ThreadFactoryProvider.java
new file mode 100644 (file)
index 0000000..077e827
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Optional;
+import java.util.concurrent.ThreadFactory;
+import org.immutables.value.Value;
+import org.slf4j.Logger;
+
+/**
+ * Builder for {@link ThreadFactory}. Easier to use than the
+ * {@link ThreadFactoryBuilder}, because it enforces setting all required
+ * properties through a staged builder.
+ *
+ * @author Michael Vorburger.ch
+ */
+@Value.Immutable
+@Value.Style(stagedBuilder = true)
+public abstract class ThreadFactoryProvider {
+
+    // This class is also available in infrautils (but yangtools cannot depend on infrautils)
+    // as org.opendaylight.infrautils.utils.concurrent.ThreadFactoryProvider
+
+    public static ImmutableThreadFactoryProvider.NamePrefixBuildStage builder() {
+        return ImmutableThreadFactoryProvider.builder();
+    }
+
+    /**
+     * Prefix for threads from this factory. For example, "rpc-pool", to create
+     * "rpc-pool-1/2/3" named threads. Note that this is a prefix, not a format,
+     * so you pass just "rpc-pool" instead of e.g. "rpc-pool-%d".
+     */
+    @Value.Parameter public abstract String namePrefix();
+
+    /**
+     * Logger used to log uncaught exceptions from new threads created via this factory.
+     */
+    @Value.Parameter public abstract Logger logger();
+
+    /**
+     * Priority for new threads from this factory.
+     */
+    @Value.Parameter public abstract Optional<Integer> priority();
+
+    /**
+     * Daemon or not for new threads created via this factory.
+     * <b>NB: Defaults to true.</b>
+     */
+    @Value.Default public boolean daemon() {
+        return true;
+    }
+
+    public ThreadFactory get() {
+        ThreadFactoryBuilder guavaBuilder = new ThreadFactoryBuilder();
+        guavaBuilder.setNameFormat(namePrefix() + "-%d");
+        guavaBuilder.setUncaughtExceptionHandler(LoggingThreadUncaughtExceptionHandler.toLogger(logger()));
+        guavaBuilder.setDaemon(daemon());
+        priority().ifPresent(guavaBuilder::setPriority);
+        logger().info("ThreadFactory created: {}", namePrefix());
+        return guavaBuilder.build();
+    }
+}