package org.opendaylight.yangtools.util.concurrent;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import com.google.common.base.Function;
-import com.google.common.util.concurrent.AbstractListeningExecutorService;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
/**
* An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that
- * could occur if clients invoke the returned Future's <ode>get</code> methods synchronously.
+ * could occur if clients invoke the returned Future's <code>get</code> methods synchronously.
* <p>
* Deadlock scenarios are most apt to occur with a backing single-threaded executor where setting of
* the Future's result is executed on the single thread. Here's a scenario:
* from this class override the <code>get</code> methods to check if the ThreadLocal is set. If it is,
* an ExecutionException is thrown with a custom cause.
*
+ * Note that the ThreadLocal is not removed automatically, so some state may be left hanging off of
+ * threads which have encountered this class. If you need to clean that state up, use
+ * {@link #cleanStateForCurrentThread()}.
+ *
* @author Thomas Pantelis
+ * @author Robert Varga
*/
-public class DeadlockDetectingListeningExecutorService extends AbstractListeningExecutorService {
- private final ThreadLocal<Boolean> deadlockDetector = new ThreadLocal<>();
- private final Function<Void, Exception> deadlockExceptionFunction;
- private final ExecutorService delegate;
+public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingListeningExecutorService {
+ /*
+ * We cannot use a static field simply because our API contract allows nesting, which means some
+ * tasks may be submitted to underlay and some to overlay service -- and the two cases need to
+ * be discerned reliably.
+ */
+ private final SettableBooleanThreadLocal deadlockDetector = new SettableBooleanThreadLocal();
+ private final Supplier<Exception> deadlockExceptionFunction;
+
+ // Compatibility wrapper, needs to be removed once the deprecated constructors are gone.
+ private static final class CompatExceptionSupplier implements Supplier<Exception> {
+ private final Function<Void, Exception> function;
+
+ private CompatExceptionSupplier(final Function<Void, Exception> function) {
+ this.function = Preconditions.checkNotNull(function);
+ }
+
+ @Override
+ public Exception get() {
+ return function.apply(null);
+ }
+ }
/**
* Constructor.
* @param delegate the backing ExecutorService.
* @param deadlockExceptionFunction Function that returns an Exception instance to set as the
* cause of the ExecutionException when a deadlock is detected.
+ * @deprecated Use {@link #DeadlockDetectingListeningExecutorService(ExecutorService, Supplier)} instead.
*/
+ @Deprecated
public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
- final Function<Void,Exception> deadlockExceptionFunction) {
- this.delegate = checkNotNull(delegate);
- this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction);
+ final Function<Void, Exception> deadlockExceptionFunction) {
+ this(delegate, deadlockExceptionFunction, null);
}
- @Override
- public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
- return delegate.awaitTermination(timeout, unit);
+ /**
+ * Constructor.
+ *
+ * @param delegate the backing ExecutorService.
+ * @param deadlockExceptionFunction Function that returns an Exception instance to set as the
+ * cause of the ExecutionException when a deadlock is detected.
+ * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously.
+ * If null, no executor is used.
+ * @deprecated Use {@link #DeadlockDetectingListeningExecutorService(ExecutorService, Supplier, Executor)} instead.
+ */
+ @Deprecated
+ public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
+ final Function<Void, Exception> deadlockExceptionFunction,
+ @Nullable final Executor listenableFutureExecutor) {
+ super(delegate, listenableFutureExecutor);
+ this.deadlockExceptionFunction = new CompatExceptionSupplier(deadlockExceptionFunction);
}
- @Override
- public boolean isShutdown() {
- return delegate.isShutdown();
+ /**
+ * Constructor.
+ *
+ * @param delegate the backing ExecutorService.
+ * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the
+ * cause of the ExecutionException when a deadlock is detected.
+ */
+ public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
+ @Nonnull final Supplier<Exception> deadlockExceptionSupplier) {
+ this(delegate, deadlockExceptionSupplier, null);
}
- @Override
- public boolean isTerminated() {
- return delegate.isTerminated();
+ /**
+ * Constructor.
+ *
+ * @param delegate the backing ExecutorService.
+ * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the
+ * cause of the ExecutionException when a deadlock is detected.
+ * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously.
+ * If null, no executor is used.
+ */
+ public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
+ @Nonnull final Supplier<Exception> deadlockExceptionSupplier,
+ @Nullable final Executor listenableFutureExecutor ) {
+ super(delegate, listenableFutureExecutor);
+ this.deadlockExceptionFunction = Preconditions.checkNotNull(deadlockExceptionSupplier);
}
@Override
- public void shutdown() {
- delegate.shutdown();
+ public void execute(final Runnable command) {
+ getDelegate().execute(wrapRunnable(command));
}
@Override
- public List<Runnable> shutdownNow() {
- return delegate.shutdownNow();
+ public <T> ListenableFuture<T> submit(final Callable<T> task) {
+ return wrapListenableFuture(super.submit(wrapCallable(task)));
}
@Override
- public void execute(final Runnable command) {
- delegate.execute(wrapRunnable(command));
+ public ListenableFuture<?> submit(final Runnable task) {
+ return wrapListenableFuture(super.submit(wrapRunnable(task)));
}
@Override
- public <T> ListenableFuture<T> submit(final Callable<T> task ) {
- final ListenableFutureTask<T> futureTask = ListenableFutureTask.create(wrapCallable(task));
- delegate.execute(futureTask);
- return wrapListenableFuture(futureTask);
+ public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
+ return wrapListenableFuture(super.submit(wrapRunnable(task), result));
}
- @Override
- public ListenableFuture<?> submit( final Runnable task ) {
- ListenableFutureTask<Void> futureTask = ListenableFutureTask.create(wrapRunnable(task), null);
- delegate.execute(futureTask);
- return wrapListenableFuture(futureTask);
+ /**
+ * Remove the state this instance may have attached to the calling thread. If no state
+ * was attached this method does nothing.
+ */
+ public void cleanStateForCurrentThread() {
+ deadlockDetector.remove();
}
- @Override
- public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
- ListenableFutureTask<T> futureTask = ListenableFutureTask.create(wrapRunnable(task), result);
- delegate.execute(futureTask);
- return wrapListenableFuture(futureTask);
+ private SettableBoolean primeDetector() {
+ final SettableBoolean b = deadlockDetector.get();
+ Preconditions.checkState(!b.isSet(), "Detector for {} has already been primed", this);
+ b.set();
+ return b;
}
private Runnable wrapRunnable(final Runnable task) {
return new Runnable() {
@Override
public void run() {
- deadlockDetector.set(Boolean.TRUE);
+ final SettableBoolean b = primeDetector();
try {
task.run();
} finally {
- deadlockDetector.set(null);
+ b.reset();
}
}
};
return new Callable<T>() {
@Override
public T call() throws Exception {
- deadlockDetector.set(Boolean.TRUE);
+ final SettableBoolean b = primeDetector();
try {
return delagate.call();
} finally {
- deadlockDetector.set(null);
+ b.reset();
}
}
};
}
- private <T> ListenableFuture<T> wrapListenableFuture(final ListenableFuture<T> delegate ) {
+ private <T> ListenableFuture<T> wrapListenableFuture(final ListenableFuture<T> delegate) {
/*
- * This creates a forwarding Future that overrides calls to get(...) to check, via the ThreadLocal,
- * if the caller is doing a blocking call on a thread from this executor. If so, we detect this as
- * a deadlock and throw an ExecutionException even though it may not be a deadlock if there are
- * more than 1 thread in the pool. Either way, there's bad practice somewhere, either on the client
- * side for doing a blocking call or in the framework's threading model.
+ * This creates a forwarding Future that overrides calls to get(...) to check, via the
+ * ThreadLocal, if the caller is doing a blocking call on a thread from this executor. If
+ * so, we detect this as a deadlock and throw an ExecutionException even though it may not
+ * be a deadlock if there are more than 1 thread in the pool. Either way, there's bad
+ * practice somewhere, either on the client side for doing a blocking call or in the
+ * framework's threading model.
*/
return new ForwardingListenableFuture.SimpleForwardingListenableFuture<T>(delegate) {
@Override
}
void checkDeadLockDetectorTL() throws ExecutionException {
- if (deadlockDetector.get() != null) {
+ if (deadlockDetector.get().isSet()) {
throw new ExecutionException("A potential deadlock was detected.",
- deadlockExceptionFunction.apply(null));
+ deadlockExceptionFunction.get());
}
}
};