BUG-1787: cleanup AsyncNotifyingListenableFutureTask 85/10985/3
authorRobert Varga <rovarga@cisco.com>
Wed, 10 Sep 2014 10:42:06 +0000 (12:42 +0200)
committerRobert Varga <rovarga@cisco.com>
Wed, 10 Sep 2014 15:04:29 +0000 (17:04 +0200)
First move the 'listenerExecutor != null' case into a subclass,
replacing the check with proper OOP override.

Second switch to using SettableBoolean, so we bypass the need to use
ThreadLocal.set/remove in fast path. Also introduce an explicit remove
method, which can be used to cleanup the thread-local state (which was
not previously possible).

Change-Id: I6e68cb1ca791c3ea721cb3379a1e8aa6724ab9b8
Signed-off-by: Robert Varga <rovarga@cisco.com>
common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListenableFutureTask.java

index 69c94f32a35cfc1e7b1ec9bf71e6287bd88b7acc..2ba16931f18a8f687836415c8da213e2a6cc3bab 100644 (file)
@@ -8,18 +8,21 @@
 
 package org.opendaylight.yangtools.util.concurrent;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.FutureTask;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ListenableFuture;
-
 /**
  * A {@link FutureTask} that also implements the {@link ListenableFuture} interface similar to
  * guava's {@link ListenableFutureTask}. This class differs from ListenableFutureTask in that it
@@ -35,38 +38,91 @@ import com.google.common.util.concurrent.ListenableFuture;
  * listener Runnable would execute in the thread that completed this task, the listener
  * is executed on Executor specified on construction.
  *
+ * Also note that the use of this task may attach some (small) amount of state to the threads
+ * interacting with it. That state will not be detached automatically, but you can use
+ *  {@link #cleanStateForCurrentThread()} to clean it up.
+ *
  * @author Thomas Pantelis
+ * @author Robert Varga
  *
  * @param <V> the Future result value type
  */
 public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
+    private static final class DelegatingAsyncNotifyingListenableFutureTask<V> extends AsyncNotifyingListenableFutureTask<V> {
+        /**
+         * The executor used to run listener callbacks.
+         */
+        private final Executor listenerExecutor;
+
+        private DelegatingAsyncNotifyingListenableFutureTask(final Callable<V> callable, @Nullable final Executor listenerExecutor) {
+            super(callable);
+            this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
+        }
+
+        private DelegatingAsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result,
+                @Nullable final Executor listenerExecutor) {
+            super(runnable, result);
+            this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
+        }
+
+        @Override
+        public void addListener(final Runnable listener, final Executor executor) {
+            // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one that
+            // runs tasks in the same thread as the caller submitting the task
+            // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and the
+            // listener is executed from the #done method, then the DelegatingRunnable will detect this
+            // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
+            //
+            // On the other hand, if this task is already complete, the call to ExecutionList#add in
+            // superclass will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
+            // the DelegatingRunnable will run the listener Runnable inline.
+            super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor);
+        }
+    }
 
-    private static final Logger LOG = LoggerFactory.getLogger( AsyncNotifyingListenableFutureTask.class );
+    private static final class DelegatingRunnable implements Runnable {
+        private final Runnable delegate;
+        private final Executor executor;
+
+        DelegatingRunnable(final Runnable delegate, final Executor executor) {
+            this.delegate = Preconditions.checkNotNull(delegate);
+            this.executor = Preconditions.checkNotNull(executor);
+        }
+
+        @Override
+        public void run() {
+            if (ON_TASK_COMPLETION_THREAD_TL.get().isSet()) {
+                // We're running on the task completion thread so off-load to the executor.
+                LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}",
+                        Thread.currentThread().getName(), executor);
+                executor.execute(delegate);
+            } else {
+                // We're not running on the task completion thread so run the delegate inline.
+                LOG.trace("Executing ListenenableFuture Runnable on this thread: {}",
+                        Thread.currentThread().getName());
+                delegate.run();
+            }
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncNotifyingListenableFutureTask.class);
 
     /**
      * ThreadLocal used to detect if the task completion thread is running the listeners.
      */
-    private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<>();
+    private static final SettableBooleanThreadLocal ON_TASK_COMPLETION_THREAD_TL = new SettableBooleanThreadLocal();
 
     /**
      *  The execution list to hold our listeners.
      */
     private final ExecutionList executionList = new ExecutionList();
 
-    /**
-     * The executor used to run listener callbacks.
-     */
-    private final Executor listenerExecutor;
-
-    private AsyncNotifyingListenableFutureTask( Callable<V> callable, @Nullable Executor listenerExecutor ) {
-        super( callable );
-        this.listenerExecutor = listenerExecutor;
+    private AsyncNotifyingListenableFutureTask(final Callable<V> callable) {
+        super(callable);
     }
 
-    private AsyncNotifyingListenableFutureTask( Runnable runnable, @Nullable V result,
-            @Nullable Executor listenerExecutor ) {
-        super( runnable, result );
-        this.listenerExecutor = listenerExecutor;
+    private AsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result) {
+        super(runnable, result);
     }
 
     /**
@@ -77,9 +133,13 @@ public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> impleme
      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
      *                         If null, no executor is used.
      */
-    public static <V> AsyncNotifyingListenableFutureTask<V> create( Callable<V> callable,
-            @Nullable Executor listenerExecutor ) {
-      return new AsyncNotifyingListenableFutureTask<V>( callable, listenerExecutor );
+    public static <V> AsyncNotifyingListenableFutureTask<V> create(final Callable<V> callable,
+            @Nullable final Executor listenerExecutor) {
+        if (listenerExecutor != null) {
+            return new DelegatingAsyncNotifyingListenableFutureTask<V>(callable, listenerExecutor);
+        } else {
+            return new AsyncNotifyingListenableFutureTask<V>(callable);
+        }
     }
 
     /**
@@ -92,25 +152,26 @@ public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> impleme
      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
      *                         If null, no executor is used.
      */
-    public static <V> AsyncNotifyingListenableFutureTask<V> create( Runnable runnable, @Nullable V result,
-            @Nullable Executor listenerExecutor ) {
-      return new AsyncNotifyingListenableFutureTask<V>( runnable, result, listenerExecutor );
+    public static <V> AsyncNotifyingListenableFutureTask<V> create(final Runnable runnable, @Nullable final V result,
+            @Nullable final Executor listenerExecutor) {
+        if (listenerExecutor != null) {
+            return new DelegatingAsyncNotifyingListenableFutureTask<V>(runnable, result, listenerExecutor);
+        } else {
+            return new AsyncNotifyingListenableFutureTask<V>(runnable, result);
+        }
     }
 
     @Override
-    public void addListener( Runnable listener, Executor executor ) {
-        // If a listenerExecutor was specified on construction, wrap the listener Runnable in a
-        // DelegatingRunnable. If the specified executor is one that runs tasks in the same thread
-        // as the caller submitting the task (eg MoreExecutors#sameThreadExecutor) and the
-        // listener is executed from the #done method, then the DelegatingRunnable will detect this
-        // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
-        //
-        // On the other hand, if this task is already complete, the call to ExecutionList#add below
-        // will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
-        // the DelegatingRunnable will run the listener Runnable inline.
-
-        executionList.add( listenerExecutor == null ? listener :
-            new DelegatingRunnable( listener, listenerExecutor ), executor );
+    public void addListener(@Nonnull final Runnable listener, final Executor executor) {
+        executionList.add(listener, executor);
+    }
+
+    /**
+     * Remove the state which may have attached to the calling thread. If no state
+     * was attached this method does nothing.
+     */
+    public static void cleanStateForCurrentThread() {
+        ON_TASK_COMPLETION_THREAD_TL.remove();
     }
 
     /**
@@ -118,37 +179,13 @@ public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> impleme
      */
     @Override
     protected void done() {
-        ON_TASK_COMPLETION_THREAD_TL.set( Boolean.TRUE );
+        final SettableBoolean b = ON_TASK_COMPLETION_THREAD_TL.get();
+        b.set();
+
         try {
             executionList.execute();
         } finally {
-            ON_TASK_COMPLETION_THREAD_TL.remove();
-        }
-    }
-
-    private static class DelegatingRunnable implements Runnable {
-
-        private final Runnable delegate;
-        private final Executor executor;
-
-        DelegatingRunnable( Runnable delegate, Executor executor ) {
-            this.delegate = delegate;
-            this.executor = executor;
-        }
-
-        @Override
-        public void run() {
-            if( ON_TASK_COMPLETION_THREAD_TL.get() == null ) {
-                // We're not running on the task completion thread so run the delegate inline.
-                LOG.trace( "Executing ListenenableFuture Runnable on this thread: {}",
-                        Thread.currentThread().getName() );
-                delegate.run();
-            } else {
-                // We're running on the task completion thread so off-load to the executor.
-                LOG.trace( "Submitting ListenenableFuture Runnable to the listenerExecutor",
-                        Thread.currentThread().getName() );
-                executor.execute( delegate );
-            }
+            b.reset();
         }
     }
 }