Bug 1430: New common/util concurrent classes
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / DeadlockDetectingListeningExecutorService.java
index 39332be85e092b9c5567316a2e5ac0eb84a07ddb..011872d6b138d9edbaf28869524121bd859f0b43 100644 (file)
@@ -11,18 +11,17 @@ package org.opendaylight.yangtools.util.concurrent;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Function;
-import com.google.common.util.concurrent.AbstractListeningExecutorService;
 import com.google.common.util.concurrent.ForwardingListenableFuture;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
-
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import javax.annotation.Nullable;
+
 /**
  * An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that
  * could occur if clients invoke the returned Future's <ode>get</code> methods synchronously.
@@ -45,10 +44,9 @@ import java.util.concurrent.TimeoutException;
  *
  * @author Thomas Pantelis
  */
-public class DeadlockDetectingListeningExecutorService extends AbstractListeningExecutorService {
+public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingListeningExecutorService {
     private final ThreadLocal<Boolean> deadlockDetector = new ThreadLocal<>();
     private final Function<Void, Exception> deadlockExceptionFunction;
-    private final ExecutorService delegate;
 
     /**
      * Constructor.
@@ -57,61 +55,45 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening
      * @param deadlockExceptionFunction Function that returns an Exception instance to set as the
      *             cause of the ExecutionException when a deadlock is detected.
      */
-    public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
-            final Function<Void,Exception> deadlockExceptionFunction) {
-        this.delegate = checkNotNull(delegate);
-        this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction);
-    }
-
-    @Override
-    public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
-        return delegate.awaitTermination(timeout, unit);
-    }
-
-    @Override
-    public boolean isShutdown() {
-        return delegate.isShutdown();
-    }
-
-    @Override
-    public boolean isTerminated() {
-        return delegate.isTerminated();
+    public DeadlockDetectingListeningExecutorService( ExecutorService delegate,
+                                          Function<Void,Exception> deadlockExceptionFunction ) {
+        this(delegate, deadlockExceptionFunction, null);
     }
 
-    @Override
-    public void shutdown() {
-        delegate.shutdown();
-    }
-
-    @Override
-    public List<Runnable> shutdownNow() {
-        return delegate.shutdownNow();
+    /**
+     * Constructor.
+     *
+     * @param delegate the backing ExecutorService.
+     * @param deadlockExceptionFunction Function that returns an Exception instance to set as the
+     *             cause of the ExecutionException when a deadlock is detected.
+     * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously.
+     *             If null, no executor is used.
+     */
+    public DeadlockDetectingListeningExecutorService( ExecutorService delegate,
+                                          Function<Void,Exception> deadlockExceptionFunction,
+                                          @Nullable Executor listenableFutureExecutor ) {
+        super(delegate, listenableFutureExecutor);
+        this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction);
     }
 
     @Override
-    public void execute(final Runnable command) {
-        delegate.execute(wrapRunnable(command));
+    public void execute( Runnable command ){
+        getDelegate().execute(wrapRunnable(command));
     }
 
     @Override
-    public <T> ListenableFuture<T> submit(final Callable<T> task ) {
-        final ListenableFutureTask<T> futureTask = ListenableFutureTask.create(wrapCallable(task));
-        delegate.execute(futureTask);
-        return wrapListenableFuture(futureTask);
+    public <T> ListenableFuture<T> submit( Callable<T> task ){
+        return wrapListenableFuture(super.submit(wrapCallable(task)));
     }
 
     @Override
-    public ListenableFuture<?> submit( final Runnable task ) {
-        ListenableFutureTask<Void> futureTask = ListenableFutureTask.create(wrapRunnable(task), null);
-        delegate.execute(futureTask);
-        return wrapListenableFuture(futureTask);
+    public ListenableFuture<?> submit( Runnable task ){
+        return wrapListenableFuture(super.submit(wrapRunnable(task)));
     }
 
     @Override
-    public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
-        ListenableFutureTask<T> futureTask = ListenableFutureTask.create(wrapRunnable(task), result);
-        delegate.execute(futureTask);
-        return wrapListenableFuture(futureTask);
+    public <T> ListenableFuture<T> submit( Runnable task, T result ){
+        return wrapListenableFuture(super.submit(wrapRunnable(task), result));
     }
 
     private Runnable wrapRunnable(final Runnable task) {
@@ -122,7 +104,7 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening
                 try {
                     task.run();
                 } finally {
-                    deadlockDetector.set(null);
+                    deadlockDetector.remove();
                 }
             }
         };
@@ -136,7 +118,7 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening
                 try {
                     return delagate.call();
                 } finally {
-                    deadlockDetector.set(null);
+                    deadlockDetector.remove();
                 }
             }
         };
@@ -144,11 +126,12 @@ public class DeadlockDetectingListeningExecutorService extends AbstractListening
 
     private <T> ListenableFuture<T> wrapListenableFuture(final ListenableFuture<T> delegate ) {
         /*
-         *  This creates a forwarding Future that overrides calls to get(...) to check, via the ThreadLocal,
-         * if the caller is doing a blocking call on a thread from this executor. If so, we detect this as
-         * a deadlock and throw an ExecutionException even though it may not be a deadlock if there are
-         * more than 1 thread in the pool. Either way, there's bad practice somewhere, either on the client
-         * side for doing a blocking call or in the framework's threading model.
+         * This creates a forwarding Future that overrides calls to get(...) to check, via the
+         * ThreadLocal, if the caller is doing a blocking call on a thread from this executor. If
+         * so, we detect this as a deadlock and throw an ExecutionException even though it may not
+         * be a deadlock if there are more than 1 thread in the pool. Either way, there's bad
+         * practice somewhere, either on the client side for doing a blocking call or in the
+         * framework's threading model.
          */
         return new ForwardingListenableFuture.SimpleForwardingListenableFuture<T>(delegate) {
             @Override