<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.immutables</groupId>
+ <artifactId>value</artifactId>
+ </dependency>
<dependency>
<groupId>junit</groupId>
</dependency>
</dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <configuration>
- <propertyExpansion>checkstyle.violationSeverity=error</propertyExpansion>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <propertyExpansion>checkstyle.violationSeverity=error</propertyExpansion>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
- <!--
- Maven Site Configuration
+ <!--
+ Maven Site Configuration
- The following configuration is necessary for maven-site-plugin to
- correctly identify the correct deployment path for OpenDaylight Maven
- sites.
+ The following configuration is necessary for maven-site-plugin to
+ correctly identify the correct deployment path for OpenDaylight Maven
+ sites.
-->
<url>${odl.site.url}/${project.groupId}/${stream}/${project.artifactId}/</url>
- <distributionManagement>
- <site>
- <id>opendaylight-site</id>
- <url>${nexus.site.url}/${project.artifactId}/</url>
- </site>
- </distributionManagement>
+ <distributionManagement>
+ <site>
+ <id>opendaylight-site</id>
+ <url>${nexus.site.url}/${project.artifactId}/</url>
+ </site>
+ </distributionManagement>
</project>
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.slf4j.LoggerFactory;
/**
* A ThreadPoolExecutor with a specified bounded queue capacity that favors reusing previously
* the capacity of the queue.
* @param threadPrefix
* the name prefix for threads created by this executor.
+ * @param loggerIdentity
+ * the class to use as logger name for logging uncaught exceptions from the threads.
*/
- public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix) {
+ public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix,
+ Class<?> loggerIdentity) {
// We're using a custom SynchronousQueue that has a backing bounded LinkedBlockingQueue.
// We don't specify any core threads (first parameter) so, when a task is submitted,
// the base class will always try to offer to the queue. If there is an existing waiting
this.threadPrefix = requireNonNull(threadPrefix);
this.maximumQueueSize = maximumQueueSize;
- setThreadFactory(new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat(this.threadPrefix + "-%d").build());
+ setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix)
+ .logger(LoggerFactory.getLogger(loggerIdentity)).build().get());
executorQueue = (ExecutorQueue)super.getQueue();
super.setRejectedExecutionHandler(rejectedTaskHandler);
}
+ /**
+ * Constructor.
+ * @deprecated Please use {@link #CachedThreadPoolExecutor(int, int, String, Class)} instead.
+ */
+ @Deprecated
+ public CachedThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix) {
+ this(maximumPoolSize, maximumQueueSize, threadPrefix, CachedThreadPoolExecutor.class);
+ }
+
@Override
public void setRejectedExecutionHandler(final RejectedExecutionHandler handler) {
rejectedTaskHandler.setDelegateRejectedExecutionHandler(requireNonNull(handler));
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.slf4j.LoggerFactory;
/**
* A ThreadPoolExecutor with a specified bounded queue capacity that favors creating new threads
* the capacity of the queue.
* @param threadPrefix
* the name prefix for threads created by this executor.
+ * @param loggerIdentity
+ * the class to use as logger name for logging uncaught exceptions from the threads.
*/
+ public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix,
+ Class<?> loggerIdentity) {
+ this(maximumPoolSize, maximumQueueSize, DEFAULT_IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
+ threadPrefix, loggerIdentity);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @deprecated Please use
+ * {@link #FastThreadPoolExecutor(int, int, String, Class)}
+ * instead.
+ */
+ @Deprecated
public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final String threadPrefix) {
this(maximumPoolSize, maximumQueueSize, DEFAULT_IDLE_TIMEOUT_IN_SEC, TimeUnit.SECONDS,
threadPrefix);
}
+ /**
+ * Constructor.
+ *
+ * @deprecated Please use
+ * {@link #FastThreadPoolExecutor(int, int, long, TimeUnit, String, Class)}
+ * instead.
+ */
+ @Deprecated
+ public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final long keepAliveTime,
+ final TimeUnit unit, final String threadPrefix) {
+ this(maximumPoolSize, maximumQueueSize, keepAliveTime, unit, threadPrefix, FastThreadPoolExecutor.class);
+ }
+
/**
* Constructs a FastThreadPoolExecutor instance.
*
* the time unit for the keepAliveTime argument
* @param threadPrefix
* the name prefix for threads created by this executor.
+ * @param loggerIdentity
+ * the class to use as logger name for logging uncaught exceptions from the threads.
*/
public FastThreadPoolExecutor(final int maximumPoolSize, final int maximumQueueSize, final long keepAliveTime,
- final TimeUnit unit, final String threadPrefix) {
+ final TimeUnit unit, final String threadPrefix, Class<?> loggerIdentity) {
// We use all core threads (the first 2 parameters below equal) so, when a task is submitted,
// if the thread limit hasn't been reached, a new thread will be spawned to execute
// the task even if there is an existing idle thread in the pool. This is faster than
this.threadPrefix = threadPrefix;
this.maximumQueueSize = maximumQueueSize;
- setThreadFactory(new ThreadFactoryBuilder().setDaemon(true)
- .setNameFormat(threadPrefix + "-%d").build());
+ setThreadFactory(ThreadFactoryProvider.builder().namePrefix(threadPrefix)
+ .logger(LoggerFactory.getLogger(loggerIdentity)).build().get());
if (keepAliveTime > 0) {
// Need to specifically configure core threads to timeout.
--- /dev/null
+/*
+ * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Objects;
+import org.slf4j.Logger;
+
+/**
+ * Thread's UncaughtExceptionHandler which logs to slf4j.
+ *
+ * @author Michael Vorburger.ch
+ */
+public final class LoggingThreadUncaughtExceptionHandler implements UncaughtExceptionHandler {
+
+ // This class is also available in infrautils (but yangtools cannot depend on infrautils)
+ // as org.opendaylight.infrautils.utils.concurrent.LoggingThreadUncaughtExceptionHandler
+
+ /**
+ * Factory method to obtain an instance of this bound to the passed slf4j Logger.
+ */
+ public static UncaughtExceptionHandler toLogger(Logger logger) {
+ return new LoggingThreadUncaughtExceptionHandler(logger);
+ }
+
+ private final Logger logger;
+
+ private LoggingThreadUncaughtExceptionHandler(Logger logger) {
+ this.logger = Objects.requireNonNull(logger, "logger");
+ }
+
+ @Override
+ public void uncaughtException(Thread thread, Throwable throwable) {
+ logger.error("Thread terminated due to uncaught exception: {}", thread.getName(), throwable);
+ }
+}
* the capacity of the queue.
* @param threadPrefix
* the name prefix for threads created by this executor.
+ * @param loggerIdentity
+ * the class to use as logger name for logging uncaught exceptions from the threads.
* @return a new ExecutorService with the specified configuration.
*/
+ public static ExecutorService newBoundedFastThreadPool(int maximumPoolSize,
+ int maximumQueueSize, String threadPrefix, Class<?> loggerIdentity) {
+ return new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix, loggerIdentity);
+ }
+
+ /**
+ * Deprecated variant.
+ * @deprecated Please use {@link #newBoundedFastThreadPool(int, int, String, Class)} instead.
+ */
+ @Deprecated
public static ExecutorService newBoundedFastThreadPool(int maximumPoolSize,
int maximumQueueSize, String threadPrefix) {
- return new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix);
+ return newBoundedFastThreadPool(maximumPoolSize, maximumQueueSize, threadPrefix, SpecialExecutors.class);
}
/**
* the capacity of the queue.
* @param threadPrefix
* the name prefix for threads created by this executor.
+ * @param loggerIdentity
+ * the class to use as logger name for logging uncaught exceptions from the threads.
* @return a new ExecutorService with the specified configuration.
*/
public static ExecutorService newBlockingBoundedFastThreadPool(int maximumPoolSize,
- int maximumQueueSize, String threadPrefix) {
+ int maximumQueueSize, String threadPrefix, Class<?> loggerIdentity) {
- FastThreadPoolExecutor executor =
- new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix);
+ FastThreadPoolExecutor executor = new FastThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix,
+ loggerIdentity);
executor.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerRunsPolicy());
return executor;
}
+ /**
+ * Deprecated variant.
+ * @deprecated Please use {@link #newBlockingBoundedFastThreadPool(int, int, String, Class)} instead.
+ */
+ @Deprecated
+ public static ExecutorService newBlockingBoundedFastThreadPool(int maximumPoolSize,
+ int maximumQueueSize, String threadPrefix) {
+ return newBlockingBoundedFastThreadPool(maximumPoolSize, maximumQueueSize, threadPrefix,
+ SpecialExecutors.class);
+ }
+
/**
* Creates an ExecutorService with a specified bounded queue capacity that favors reusing
* previously constructed threads, when they are available, over creating new threads. When a
* the name prefix for threads created by this executor.
* @return a new ExecutorService with the specified configuration.
*/
+ public static ExecutorService newBoundedCachedThreadPool(int maximumPoolSize,
+ int maximumQueueSize, String threadPrefix, Class<?> loggerIdentity) {
+ return new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix, loggerIdentity);
+ }
+
+ /**
+ * Deprecated variant.
+ * @deprecated Please use {@link #newBoundedCachedThreadPool(int, int, String, Class)} instead.
+ */
+ @Deprecated
public static ExecutorService newBoundedCachedThreadPool(int maximumPoolSize,
int maximumQueueSize, String threadPrefix) {
- return new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix);
+ return new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix, SpecialExecutors.class);
}
/**
* @return a new ExecutorService with the specified configuration.
*/
public static ExecutorService newBlockingBoundedCachedThreadPool(int maximumPoolSize,
- int maximumQueueSize, String threadPrefix) {
+ int maximumQueueSize, String threadPrefix, Class<?> loggerIdentity) {
- CachedThreadPoolExecutor executor =
- new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize, threadPrefix);
+ CachedThreadPoolExecutor executor = new CachedThreadPoolExecutor(maximumPoolSize, maximumQueueSize,
+ threadPrefix, loggerIdentity);
executor.setRejectedExecutionHandler(CountingRejectedExecutionHandler.newCallerRunsPolicy());
return executor;
}
+ /**
+ * Deprecated variant.
+ * @deprecated Please use {@link #newBlockingBoundedCachedThreadPool(int, int, String, Class)} instead.
+ */
+ @Deprecated
+ public static ExecutorService newBlockingBoundedCachedThreadPool(int maximumPoolSize, int maximumQueueSize,
+ String threadPrefix) {
+ return newBlockingBoundedCachedThreadPool(maximumPoolSize, maximumQueueSize, threadPrefix,
+ SpecialExecutors.class);
+ }
+
/**
* Creates an ExecutorService that uses a single worker thread operating off a bounded queue
* with the specified capacity. Tasks are guaranteed to execute sequentially, and no more than
* the capacity of the queue.
* @param threadPrefix
* the name prefix for the thread created by this executor.
+ * @param loggerIdentity
+ * the class to use as logger name for logging uncaught exceptions from the threads.
* @return a new ExecutorService with the specified configuration.
*/
public static ExecutorService newBoundedSingleThreadExecutor(int maximumQueueSize,
- String threadPrefix) {
+ String threadPrefix, Class<?> loggerIdentity) {
return new FastThreadPoolExecutor(1, maximumQueueSize, Long.MAX_VALUE, TimeUnit.SECONDS,
- threadPrefix);
+ threadPrefix, loggerIdentity);
+ }
+
+ /**
+ * Deprecated variant.
+ * @deprecated Please use {@link #newBoundedSingleThreadExecutor(int, String, Class)} instead.
+ */
+ @Deprecated
+ public static ExecutorService newBoundedSingleThreadExecutor(int maximumQueueSize, String threadPrefix) {
+ return newBoundedSingleThreadExecutor(maximumQueueSize, threadPrefix, SpecialExecutors.class);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2017 Red Hat, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.yangtools.util.concurrent;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Optional;
+import java.util.concurrent.ThreadFactory;
+import org.immutables.value.Value;
+import org.slf4j.Logger;
+
+/**
+ * Builder for {@link ThreadFactory}. Easier to use than the
+ * {@link ThreadFactoryBuilder}, because it enforces setting all required
+ * properties through a staged builder.
+ *
+ * @author Michael Vorburger.ch
+ */
+@Value.Immutable
+@Value.Style(stagedBuilder = true)
+public abstract class ThreadFactoryProvider {
+
+ // This class is also available in infrautils (but yangtools cannot depend on infrautils)
+ // as org.opendaylight.infrautils.utils.concurrent.ThreadFactoryProvider
+
+ public static ImmutableThreadFactoryProvider.NamePrefixBuildStage builder() {
+ return ImmutableThreadFactoryProvider.builder();
+ }
+
+ /**
+ * Prefix for threads from this factory. For example, "rpc-pool", to create
+ * "rpc-pool-1/2/3" named threads. Note that this is a prefix, not a format,
+ * so you pass just "rpc-pool" instead of e.g. "rpc-pool-%d".
+ */
+ @Value.Parameter public abstract String namePrefix();
+
+ /**
+ * Logger used to log uncaught exceptions from new threads created via this factory.
+ */
+ @Value.Parameter public abstract Logger logger();
+
+ /**
+ * Priority for new threads from this factory.
+ */
+ @Value.Parameter public abstract Optional<Integer> priority();
+
+ /**
+ * Daemon or not for new threads created via this factory.
+ * <b>NB: Defaults to true.</b>
+ */
+ @Value.Default public boolean daemon() {
+ return true;
+ }
+
+ public ThreadFactory get() {
+ ThreadFactoryBuilder guavaBuilder = new ThreadFactoryBuilder();
+ guavaBuilder.setNameFormat(namePrefix() + "-%d");
+ guavaBuilder.setUncaughtExceptionHandler(LoggingThreadUncaughtExceptionHandler.toLogger(logger()));
+ guavaBuilder.setDaemon(daemon());
+ priority().ifPresent(guavaBuilder::setPriority);
+ logger().info("ThreadFactory created: {}", namePrefix());
+ return guavaBuilder.build();
+ }
+}