Propagate @Nonnull and @Nullable annotations
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / DeadlockDetectingListeningExecutorService.java
index 39332be85e092b9c5567316a2e5ac0eb84a07ddb..fad71a93525e9d33b4951a71d2e80f63f6d28719 100644 (file)
@@ -8,26 +8,24 @@
 
 package org.opendaylight.yangtools.util.concurrent;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.AbstractListeningExecutorService;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.common.util.concurrent.ForwardingListenableFuture;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 /**
  * An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that
- * could occur if clients invoke the returned Future's <ode>get</code> methods synchronously.
- * <p>
- * Deadlock scenarios are most apt to occur with a backing single-threaded executor where setting of
+ * could occur if clients invoke the returned Future's <code>get</code> methods synchronously.
+ *
+ * <p>Deadlock scenarios are most apt to occur with a backing single-threaded executor where setting of
  * the Future's result is executed on the single thread. Here's a scenario:
  * <ul>
  * <li>Client code is currently executing in an executor's single thread.</li>
@@ -37,118 +35,124 @@ import java.util.concurrent.TimeoutException;
  * The second submitted task will never execute since the single thread is currently executing
  * the client code which is blocked waiting for the submitted task to complete. Thus, deadlock has
  * occurred.
- * <p>
- * This class prevents this scenario via the use of a ThreadLocal variable. When a task is invoked,
+ *
+ * <p>This class prevents this scenario via the use of a ThreadLocal variable. When a task is invoked,
  * the ThreadLocal is set and, when a task completes, the ThreadLocal is cleared. Futures returned
  * from this class override the <code>get</code> methods to check if the ThreadLocal is set. If it is,
  * an ExecutionException is thrown with a custom cause.
  *
+ * <p>Note that the ThreadLocal is not removed automatically, so some state may be left hanging off of
+ * threads which have encountered this class. If you need to clean that state up, use
+ * {@link #cleanStateForCurrentThread()}.
+ *
  * @author Thomas Pantelis
+ * @author Robert Varga
  */
-public class DeadlockDetectingListeningExecutorService extends AbstractListeningExecutorService {
-    private final ThreadLocal<Boolean> deadlockDetector = new ThreadLocal<>();
-    private final Function<Void, Exception> deadlockExceptionFunction;
-    private final ExecutorService delegate;
+public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingListeningExecutorService {
+    /*
+     * We cannot use a static field simply because our API contract allows nesting, which means some
+     * tasks may be submitted to underlay and some to overlay service -- and the two cases need to
+     * be discerned reliably.
+     */
+    private final SettableBooleanThreadLocal deadlockDetector = new SettableBooleanThreadLocal();
+    private final Supplier<Exception> deadlockExceptionFunction;
 
     /**
      * Constructor.
      *
      * @param delegate the backing ExecutorService.
-     * @param deadlockExceptionFunction Function that returns an Exception instance to set as the
+     * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the
      *             cause of the ExecutionException when a deadlock is detected.
      */
     public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
-            final Function<Void,Exception> deadlockExceptionFunction) {
-        this.delegate = checkNotNull(delegate);
-        this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction);
+            @Nonnull final Supplier<Exception> deadlockExceptionSupplier) {
+        this(delegate, deadlockExceptionSupplier, null);
     }
 
-    @Override
-    public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
-        return delegate.awaitTermination(timeout, unit);
-    }
-
-    @Override
-    public boolean isShutdown() {
-        return delegate.isShutdown();
-    }
-
-    @Override
-    public boolean isTerminated() {
-        return delegate.isTerminated();
+    /**
+     * Constructor.
+     *
+     * @param delegate the backing ExecutorService.
+     * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the
+     *             cause of the ExecutionException when a deadlock is detected.
+     * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously.
+     *             If null, no executor is used.
+     */
+    public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
+            @Nonnull final Supplier<Exception> deadlockExceptionSupplier,
+            @Nullable final Executor listenableFutureExecutor ) {
+        super(delegate, listenableFutureExecutor);
+        this.deadlockExceptionFunction = Preconditions.checkNotNull(deadlockExceptionSupplier);
     }
 
     @Override
-    public void shutdown() {
-        delegate.shutdown();
+    public void execute(@Nonnull final Runnable command) {
+        getDelegate().execute(wrapRunnable(command));
     }
 
+    @Nonnull
     @Override
-    public List<Runnable> shutdownNow() {
-        return delegate.shutdownNow();
+    public <T> ListenableFuture<T> submit(final Callable<T> task) {
+        return wrapListenableFuture(super.submit(wrapCallable(task)));
     }
 
+    @Nonnull
     @Override
-    public void execute(final Runnable command) {
-        delegate.execute(wrapRunnable(command));
+    public ListenableFuture<?> submit(final Runnable task) {
+        return wrapListenableFuture(super.submit(wrapRunnable(task)));
     }
 
+    @Nonnull
     @Override
-    public <T> ListenableFuture<T> submit(final Callable<T> task ) {
-        final ListenableFutureTask<T> futureTask = ListenableFutureTask.create(wrapCallable(task));
-        delegate.execute(futureTask);
-        return wrapListenableFuture(futureTask);
+    public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
+        return wrapListenableFuture(super.submit(wrapRunnable(task), result));
     }
 
-    @Override
-    public ListenableFuture<?> submit( final Runnable task ) {
-        ListenableFutureTask<Void> futureTask = ListenableFutureTask.create(wrapRunnable(task), null);
-        delegate.execute(futureTask);
-        return wrapListenableFuture(futureTask);
+    /**
+     * Remove the state this instance may have attached to the calling thread. If no state
+     * was attached this method does nothing.
+     */
+    public void cleanStateForCurrentThread() {
+        deadlockDetector.remove();
     }
 
-    @Override
-    public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
-        ListenableFutureTask<T> futureTask = ListenableFutureTask.create(wrapRunnable(task), result);
-        delegate.execute(futureTask);
-        return wrapListenableFuture(futureTask);
+    private SettableBoolean primeDetector() {
+        final SettableBoolean b = deadlockDetector.get();
+        Preconditions.checkState(!b.isSet(), "Detector for {} has already been primed", this);
+        b.set();
+        return b;
     }
 
     private Runnable wrapRunnable(final Runnable task) {
-        return new Runnable() {
-            @Override
-            public void run() {
-                deadlockDetector.set(Boolean.TRUE);
-                try {
-                    task.run();
-                } finally {
-                    deadlockDetector.set(null);
-                }
+        return () -> {
+            final SettableBoolean b = primeDetector();
+            try {
+                task.run();
+            } finally {
+                b.reset();
             }
         };
     }
 
     private <T> Callable<T> wrapCallable(final Callable<T> delagate) {
-        return new Callable<T>() {
-            @Override
-            public T call() throws Exception {
-                deadlockDetector.set(Boolean.TRUE);
-                try {
-                    return delagate.call();
-                } finally {
-                    deadlockDetector.set(null);
-                }
+        return () -> {
+            final SettableBoolean b = primeDetector();
+            try {
+                return delagate.call();
+            } finally {
+                b.reset();
             }
         };
     }
 
-    private <T> ListenableFuture<T> wrapListenableFuture(final ListenableFuture<T> delegate ) {
+    private <T> ListenableFuture<T> wrapListenableFuture(final ListenableFuture<T> delegate) {
         /*
-         *  This creates a forwarding Future that overrides calls to get(...) to check, via the ThreadLocal,
-         * if the caller is doing a blocking call on a thread from this executor. If so, we detect this as
-         * a deadlock and throw an ExecutionException even though it may not be a deadlock if there are
-         * more than 1 thread in the pool. Either way, there's bad practice somewhere, either on the client
-         * side for doing a blocking call or in the framework's threading model.
+         * This creates a forwarding Future that overrides calls to get(...) to check, via the
+         * ThreadLocal, if the caller is doing a blocking call on a thread from this executor. If
+         * so, we detect this as a deadlock and throw an ExecutionException even though it may not
+         * be a deadlock if there are more than 1 thread in the pool. Either way, there's bad
+         * practice somewhere, either on the client side for doing a blocking call or in the
+         * framework's threading model.
          */
         return new ForwardingListenableFuture.SimpleForwardingListenableFuture<T>(delegate) {
             @Override
@@ -158,16 +162,16 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening
             }
 
             @Override
-            public T get(final long timeout, final TimeUnit unit)
+            public T get(final long timeout, @Nonnull final TimeUnit unit)
                     throws InterruptedException, ExecutionException, TimeoutException {
                 checkDeadLockDetectorTL();
                 return super.get(timeout, unit);
             }
 
             void checkDeadLockDetectorTL() throws ExecutionException {
-                if (deadlockDetector.get() != null) {
+                if (deadlockDetector.get().isSet()) {
                     throw new ExecutionException("A potential deadlock was detected.",
-                            deadlockExceptionFunction.apply(null));
+                            deadlockExceptionFunction.get());
                 }
             }
         };