X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=common%2Futil%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fyangtools%2Futil%2Fconcurrent%2FDeadlockDetectingListeningExecutorService.java;h=ce66d10dcd7419ae90ae6032bfa5a33143440b48;hb=b0b8d9ef5d43eb850a92203345a3e72c6eeaf043;hp=39332be85e092b9c5567316a2e5ac0eb84a07ddb;hpb=9251a7269bd40a0c44df23426b2e73506d04740b;p=yangtools.git diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java index 39332be85e..ce66d10dcd 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java @@ -8,24 +8,25 @@ package org.opendaylight.yangtools.util.concurrent; -import static com.google.common.base.Preconditions.checkNotNull; - import com.google.common.base.Function; -import com.google.common.util.concurrent.AbstractListeningExecutorService; +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.util.concurrent.ForwardingListenableFuture; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + /** * An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that - * could occur if clients invoke the returned Future's get methods synchronously. + * could occur if clients invoke the returned Future's get methods synchronously. *

* Deadlock scenarios are most apt to occur with a backing single-threaded executor where setting of * the Future's result is executed on the single thread. Here's a scenario: @@ -43,12 +44,35 @@ import java.util.concurrent.TimeoutException; * from this class override the get methods to check if the ThreadLocal is set. If it is, * an ExecutionException is thrown with a custom cause. * + * Note that the ThreadLocal is not removed automatically, so some state may be left hanging off of + * threads which have encountered this class. If you need to clean that state up, use + * {@link #cleanStateForCurrentThread()}. + * * @author Thomas Pantelis + * @author Robert Varga */ -public class DeadlockDetectingListeningExecutorService extends AbstractListeningExecutorService { - private final ThreadLocal deadlockDetector = new ThreadLocal<>(); - private final Function deadlockExceptionFunction; - private final ExecutorService delegate; +public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingListeningExecutorService { + /* + * We cannot use a static field simply because our API contract allows nesting, which means some + * tasks may be submitted to underlay and some to overlay service -- and the two cases need to + * be discerned reliably. + */ + private final SettableBooleanThreadLocal deadlockDetector = new SettableBooleanThreadLocal(); + private final Supplier deadlockExceptionFunction; + + // Compatibility wrapper, needs to be removed once the deprecated constructors are gone. + private static final class CompatExceptionSupplier implements Supplier { + private final Function function; + + private CompatExceptionSupplier(final Function function) { + this.function = Preconditions.checkNotNull(function); + } + + @Override + public Exception get() { + return function.apply(null); + } + } /** * Constructor. @@ -56,73 +80,104 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening * @param delegate the backing ExecutorService. * @param deadlockExceptionFunction Function that returns an Exception instance to set as the * cause of the ExecutionException when a deadlock is detected. + * @deprecated Use {@link #DeadlockDetectingListeningExecutorService(ExecutorService, Supplier)} instead. */ + @Deprecated public DeadlockDetectingListeningExecutorService(final ExecutorService delegate, - final Function deadlockExceptionFunction) { - this.delegate = checkNotNull(delegate); - this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction); + final Function deadlockExceptionFunction) { + this(delegate, deadlockExceptionFunction, null); } - @Override - public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { - return delegate.awaitTermination(timeout, unit); + /** + * Constructor. + * + * @param delegate the backing ExecutorService. + * @param deadlockExceptionFunction Function that returns an Exception instance to set as the + * cause of the ExecutionException when a deadlock is detected. + * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously. + * If null, no executor is used. + * @deprecated Use {@link #DeadlockDetectingListeningExecutorService(ExecutorService, Supplier, Executor)} instead. + */ + @Deprecated + public DeadlockDetectingListeningExecutorService(final ExecutorService delegate, + final Function deadlockExceptionFunction, + @Nullable final Executor listenableFutureExecutor) { + super(delegate, listenableFutureExecutor); + this.deadlockExceptionFunction = new CompatExceptionSupplier(deadlockExceptionFunction); } - @Override - public boolean isShutdown() { - return delegate.isShutdown(); + /** + * Constructor. + * + * @param delegate the backing ExecutorService. + * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the + * cause of the ExecutionException when a deadlock is detected. + */ + public DeadlockDetectingListeningExecutorService(final ExecutorService delegate, + @Nonnull final Supplier deadlockExceptionSupplier) { + this(delegate, deadlockExceptionSupplier, null); } - @Override - public boolean isTerminated() { - return delegate.isTerminated(); + /** + * Constructor. + * + * @param delegate the backing ExecutorService. + * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the + * cause of the ExecutionException when a deadlock is detected. + * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously. + * If null, no executor is used. + */ + public DeadlockDetectingListeningExecutorService(final ExecutorService delegate, + @Nonnull final Supplier deadlockExceptionSupplier, + @Nullable final Executor listenableFutureExecutor ) { + super(delegate, listenableFutureExecutor); + this.deadlockExceptionFunction = Preconditions.checkNotNull(deadlockExceptionSupplier); } @Override - public void shutdown() { - delegate.shutdown(); + public void execute(final Runnable command) { + getDelegate().execute(wrapRunnable(command)); } @Override - public List shutdownNow() { - return delegate.shutdownNow(); + public ListenableFuture submit(final Callable task) { + return wrapListenableFuture(super.submit(wrapCallable(task))); } @Override - public void execute(final Runnable command) { - delegate.execute(wrapRunnable(command)); + public ListenableFuture submit(final Runnable task) { + return wrapListenableFuture(super.submit(wrapRunnable(task))); } @Override - public ListenableFuture submit(final Callable task ) { - final ListenableFutureTask futureTask = ListenableFutureTask.create(wrapCallable(task)); - delegate.execute(futureTask); - return wrapListenableFuture(futureTask); + public ListenableFuture submit(final Runnable task, final T result) { + return wrapListenableFuture(super.submit(wrapRunnable(task), result)); } - @Override - public ListenableFuture submit( final Runnable task ) { - ListenableFutureTask futureTask = ListenableFutureTask.create(wrapRunnable(task), null); - delegate.execute(futureTask); - return wrapListenableFuture(futureTask); + /** + * Remove the state this instance may have attached to the calling thread. If no state + * was attached this method does nothing. + */ + public void cleanStateForCurrentThread() { + deadlockDetector.remove(); } - @Override - public ListenableFuture submit(final Runnable task, final T result) { - ListenableFutureTask futureTask = ListenableFutureTask.create(wrapRunnable(task), result); - delegate.execute(futureTask); - return wrapListenableFuture(futureTask); + private SettableBoolean primeDetector() { + final SettableBoolean b = deadlockDetector.get(); + Preconditions.checkState(!b.isSet(), "Detector for {} has already been primed", this); + b.set(); + return b; } private Runnable wrapRunnable(final Runnable task) { return new Runnable() { @Override public void run() { - deadlockDetector.set(Boolean.TRUE); + final SettableBoolean b = primeDetector(); try { task.run(); } finally { - deadlockDetector.set(null); + b.reset(); } } }; @@ -132,23 +187,24 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening return new Callable() { @Override public T call() throws Exception { - deadlockDetector.set(Boolean.TRUE); + final SettableBoolean b = primeDetector(); try { return delagate.call(); } finally { - deadlockDetector.set(null); + b.reset(); } } }; } - private ListenableFuture wrapListenableFuture(final ListenableFuture delegate ) { + private ListenableFuture wrapListenableFuture(final ListenableFuture delegate) { /* - * This creates a forwarding Future that overrides calls to get(...) to check, via the ThreadLocal, - * if the caller is doing a blocking call on a thread from this executor. If so, we detect this as - * a deadlock and throw an ExecutionException even though it may not be a deadlock if there are - * more than 1 thread in the pool. Either way, there's bad practice somewhere, either on the client - * side for doing a blocking call or in the framework's threading model. + * This creates a forwarding Future that overrides calls to get(...) to check, via the + * ThreadLocal, if the caller is doing a blocking call on a thread from this executor. If + * so, we detect this as a deadlock and throw an ExecutionException even though it may not + * be a deadlock if there are more than 1 thread in the pool. Either way, there's bad + * practice somewhere, either on the client side for doing a blocking call or in the + * framework's threading model. */ return new ForwardingListenableFuture.SimpleForwardingListenableFuture(delegate) { @Override @@ -165,9 +221,9 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening } void checkDeadLockDetectorTL() throws ExecutionException { - if (deadlockDetector.get() != null) { + if (deadlockDetector.get().isSet()) { throw new ExecutionException("A potential deadlock was detected.", - deadlockExceptionFunction.apply(null)); + deadlockExceptionFunction.get()); } } };