import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Function;
-import com.google.common.util.concurrent.AbstractListeningExecutorService;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-
-import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
+
/**
* An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that
* could occur if clients invoke the returned Future's <ode>get</code> methods synchronously.
*
* @author Thomas Pantelis
*/
-public class DeadlockDetectingListeningExecutorService extends AbstractListeningExecutorService {
+public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingListeningExecutorService {
private final ThreadLocal<Boolean> deadlockDetector = new ThreadLocal<>();
private final Function<Void, Exception> deadlockExceptionFunction;
- private final ExecutorService delegate;
/**
* Constructor.
* @param deadlockExceptionFunction Function that returns an Exception instance to set as the
* cause of the ExecutionException when a deadlock is detected.
*/
- public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
- final Function<Void,Exception> deadlockExceptionFunction) {
- this.delegate = checkNotNull(delegate);
- this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction);
- }
-
- @Override
- public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
- return delegate.awaitTermination(timeout, unit);
- }
-
- @Override
- public boolean isShutdown() {
- return delegate.isShutdown();
- }
-
- @Override
- public boolean isTerminated() {
- return delegate.isTerminated();
+ public DeadlockDetectingListeningExecutorService( ExecutorService delegate,
+ Function<Void,Exception> deadlockExceptionFunction ) {
+ this(delegate, deadlockExceptionFunction, null);
}
- @Override
- public void shutdown() {
- delegate.shutdown();
- }
-
- @Override
- public List<Runnable> shutdownNow() {
- return delegate.shutdownNow();
+ /**
+ * Constructor.
+ *
+ * @param delegate the backing ExecutorService.
+ * @param deadlockExceptionFunction Function that returns an Exception instance to set as the
+ * cause of the ExecutionException when a deadlock is detected.
+ * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously.
+ * If null, no executor is used.
+ */
+ public DeadlockDetectingListeningExecutorService( ExecutorService delegate,
+ Function<Void,Exception> deadlockExceptionFunction,
+ @Nullable Executor listenableFutureExecutor ) {
+ super(delegate, listenableFutureExecutor);
+ this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction);
}
@Override
- public void execute(final Runnable command) {
- delegate.execute(wrapRunnable(command));
+ public void execute( Runnable command ){
+ getDelegate().execute(wrapRunnable(command));
}
@Override
- public <T> ListenableFuture<T> submit(final Callable<T> task ) {
- final ListenableFutureTask<T> futureTask = ListenableFutureTask.create(wrapCallable(task));
- delegate.execute(futureTask);
- return wrapListenableFuture(futureTask);
+ public <T> ListenableFuture<T> submit( Callable<T> task ){
+ return wrapListenableFuture(super.submit(wrapCallable(task)));
}
@Override
- public ListenableFuture<?> submit( final Runnable task ) {
- ListenableFutureTask<Void> futureTask = ListenableFutureTask.create(wrapRunnable(task), null);
- delegate.execute(futureTask);
- return wrapListenableFuture(futureTask);
+ public ListenableFuture<?> submit( Runnable task ){
+ return wrapListenableFuture(super.submit(wrapRunnable(task)));
}
@Override
- public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
- ListenableFutureTask<T> futureTask = ListenableFutureTask.create(wrapRunnable(task), result);
- delegate.execute(futureTask);
- return wrapListenableFuture(futureTask);
+ public <T> ListenableFuture<T> submit( Runnable task, T result ){
+ return wrapListenableFuture(super.submit(wrapRunnable(task), result));
}
private Runnable wrapRunnable(final Runnable task) {
try {
task.run();
} finally {
- deadlockDetector.set(null);
+ deadlockDetector.remove();
}
}
};
try {
return delagate.call();
} finally {
- deadlockDetector.set(null);
+ deadlockDetector.remove();
}
}
};
private <T> ListenableFuture<T> wrapListenableFuture(final ListenableFuture<T> delegate ) {
/*
- * This creates a forwarding Future that overrides calls to get(...) to check, via the ThreadLocal,
- * if the caller is doing a blocking call on a thread from this executor. If so, we detect this as
- * a deadlock and throw an ExecutionException even though it may not be a deadlock if there are
- * more than 1 thread in the pool. Either way, there's bad practice somewhere, either on the client
- * side for doing a blocking call or in the framework's threading model.
+ * This creates a forwarding Future that overrides calls to get(...) to check, via the
+ * ThreadLocal, if the caller is doing a blocking call on a thread from this executor. If
+ * so, we detect this as a deadlock and throw an ExecutionException even though it may not
+ * be a deadlock if there are more than 1 thread in the pool. Either way, there's bad
+ * practice somewhere, either on the client side for doing a blocking call or in the
+ * framework's threading model.
*/
return new ForwardingListenableFuture.SimpleForwardingListenableFuture<T>(delegate) {
@Override