From 582c7ee3017a5c514b48a3779da40d6ef652c41e Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 10 Sep 2014 12:42:06 +0200 Subject: [PATCH] BUG-1787: cleanup AsyncNotifyingListenableFutureTask First move the 'listenerExecutor != null' case into a subclass, replacing the check with proper OOP override. Second switch to using SettableBoolean, so we bypass the need to use ThreadLocal.set/remove in fast path. Also introduce an explicit remove method, which can be used to cleanup the thread-local state (which was not previously possible). Change-Id: I6e68cb1ca791c3ea721cb3379a1e8aa6724ab9b8 Signed-off-by: Robert Varga --- .../AsyncNotifyingListenableFutureTask.java | 165 +++++++++++------- 1 file changed, 101 insertions(+), 64 deletions(-) diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListenableFutureTask.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListenableFutureTask.java index 69c94f32a3..2ba16931f1 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListenableFutureTask.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/AsyncNotifyingListenableFutureTask.java @@ -8,18 +8,21 @@ package org.opendaylight.yangtools.util.concurrent; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; + import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.util.concurrent.ExecutionList; -import com.google.common.util.concurrent.ListenableFuture; - /** * A {@link FutureTask} that also implements the {@link ListenableFuture} interface similar to * guava's {@link ListenableFutureTask}. This class differs from ListenableFutureTask in that it @@ -35,38 +38,91 @@ import com.google.common.util.concurrent.ListenableFuture; * listener Runnable would execute in the thread that completed this task, the listener * is executed on Executor specified on construction. * + * Also note that the use of this task may attach some (small) amount of state to the threads + * interacting with it. That state will not be detached automatically, but you can use + * {@link #cleanStateForCurrentThread()} to clean it up. + * * @author Thomas Pantelis + * @author Robert Varga * * @param the Future result value type */ public class AsyncNotifyingListenableFutureTask extends FutureTask implements ListenableFuture { + private static final class DelegatingAsyncNotifyingListenableFutureTask extends AsyncNotifyingListenableFutureTask { + /** + * The executor used to run listener callbacks. + */ + private final Executor listenerExecutor; + + private DelegatingAsyncNotifyingListenableFutureTask(final Callable callable, @Nullable final Executor listenerExecutor) { + super(callable); + this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor); + } + + private DelegatingAsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result, + @Nullable final Executor listenerExecutor) { + super(runnable, result); + this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor); + } + + @Override + public void addListener(final Runnable listener, final Executor executor) { + // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one that + // runs tasks in the same thread as the caller submitting the task + // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and the + // listener is executed from the #done method, then the DelegatingRunnable will detect this + // via the ThreadLocal and submit the listener Runnable to the listenerExecutor. + // + // On the other hand, if this task is already complete, the call to ExecutionList#add in + // superclass will execute the listener Runnable immediately and, since the ThreadLocal won't be set, + // the DelegatingRunnable will run the listener Runnable inline. + super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor); + } + } - private static final Logger LOG = LoggerFactory.getLogger( AsyncNotifyingListenableFutureTask.class ); + private static final class DelegatingRunnable implements Runnable { + private final Runnable delegate; + private final Executor executor; + + DelegatingRunnable(final Runnable delegate, final Executor executor) { + this.delegate = Preconditions.checkNotNull(delegate); + this.executor = Preconditions.checkNotNull(executor); + } + + @Override + public void run() { + if (ON_TASK_COMPLETION_THREAD_TL.get().isSet()) { + // We're running on the task completion thread so off-load to the executor. + LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}", + Thread.currentThread().getName(), executor); + executor.execute(delegate); + } else { + // We're not running on the task completion thread so run the delegate inline. + LOG.trace("Executing ListenenableFuture Runnable on this thread: {}", + Thread.currentThread().getName()); + delegate.run(); + } + } + } + + private static final Logger LOG = LoggerFactory.getLogger(AsyncNotifyingListenableFutureTask.class); /** * ThreadLocal used to detect if the task completion thread is running the listeners. */ - private static final ThreadLocal ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<>(); + private static final SettableBooleanThreadLocal ON_TASK_COMPLETION_THREAD_TL = new SettableBooleanThreadLocal(); /** * The execution list to hold our listeners. */ private final ExecutionList executionList = new ExecutionList(); - /** - * The executor used to run listener callbacks. - */ - private final Executor listenerExecutor; - - private AsyncNotifyingListenableFutureTask( Callable callable, @Nullable Executor listenerExecutor ) { - super( callable ); - this.listenerExecutor = listenerExecutor; + private AsyncNotifyingListenableFutureTask(final Callable callable) { + super(callable); } - private AsyncNotifyingListenableFutureTask( Runnable runnable, @Nullable V result, - @Nullable Executor listenerExecutor ) { - super( runnable, result ); - this.listenerExecutor = listenerExecutor; + private AsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result) { + super(runnable, result); } /** @@ -77,9 +133,13 @@ public class AsyncNotifyingListenableFutureTask extends FutureTask impleme * @param listenerExecutor the executor used to run listener callbacks asynchronously. * If null, no executor is used. */ - public static AsyncNotifyingListenableFutureTask create( Callable callable, - @Nullable Executor listenerExecutor ) { - return new AsyncNotifyingListenableFutureTask( callable, listenerExecutor ); + public static AsyncNotifyingListenableFutureTask create(final Callable callable, + @Nullable final Executor listenerExecutor) { + if (listenerExecutor != null) { + return new DelegatingAsyncNotifyingListenableFutureTask(callable, listenerExecutor); + } else { + return new AsyncNotifyingListenableFutureTask(callable); + } } /** @@ -92,25 +152,26 @@ public class AsyncNotifyingListenableFutureTask extends FutureTask impleme * @param listenerExecutor the executor used to run listener callbacks asynchronously. * If null, no executor is used. */ - public static AsyncNotifyingListenableFutureTask create( Runnable runnable, @Nullable V result, - @Nullable Executor listenerExecutor ) { - return new AsyncNotifyingListenableFutureTask( runnable, result, listenerExecutor ); + public static AsyncNotifyingListenableFutureTask create(final Runnable runnable, @Nullable final V result, + @Nullable final Executor listenerExecutor) { + if (listenerExecutor != null) { + return new DelegatingAsyncNotifyingListenableFutureTask(runnable, result, listenerExecutor); + } else { + return new AsyncNotifyingListenableFutureTask(runnable, result); + } } @Override - public void addListener( Runnable listener, Executor executor ) { - // If a listenerExecutor was specified on construction, wrap the listener Runnable in a - // DelegatingRunnable. If the specified executor is one that runs tasks in the same thread - // as the caller submitting the task (eg MoreExecutors#sameThreadExecutor) and the - // listener is executed from the #done method, then the DelegatingRunnable will detect this - // via the ThreadLocal and submit the listener Runnable to the listenerExecutor. - // - // On the other hand, if this task is already complete, the call to ExecutionList#add below - // will execute the listener Runnable immediately and, since the ThreadLocal won't be set, - // the DelegatingRunnable will run the listener Runnable inline. - - executionList.add( listenerExecutor == null ? listener : - new DelegatingRunnable( listener, listenerExecutor ), executor ); + public void addListener(@Nonnull final Runnable listener, final Executor executor) { + executionList.add(listener, executor); + } + + /** + * Remove the state which may have attached to the calling thread. If no state + * was attached this method does nothing. + */ + public static void cleanStateForCurrentThread() { + ON_TASK_COMPLETION_THREAD_TL.remove(); } /** @@ -118,37 +179,13 @@ public class AsyncNotifyingListenableFutureTask extends FutureTask impleme */ @Override protected void done() { - ON_TASK_COMPLETION_THREAD_TL.set( Boolean.TRUE ); + final SettableBoolean b = ON_TASK_COMPLETION_THREAD_TL.get(); + b.set(); + try { executionList.execute(); } finally { - ON_TASK_COMPLETION_THREAD_TL.remove(); - } - } - - private static class DelegatingRunnable implements Runnable { - - private final Runnable delegate; - private final Executor executor; - - DelegatingRunnable( Runnable delegate, Executor executor ) { - this.delegate = delegate; - this.executor = executor; - } - - @Override - public void run() { - if( ON_TASK_COMPLETION_THREAD_TL.get() == null ) { - // We're not running on the task completion thread so run the delegate inline. - LOG.trace( "Executing ListenenableFuture Runnable on this thread: {}", - Thread.currentThread().getName() ); - delegate.run(); - } else { - // We're running on the task completion thread so off-load to the executor. - LOG.trace( "Submitting ListenenableFuture Runnable to the listenerExecutor", - Thread.currentThread().getName() ); - executor.execute( delegate ); - } + b.reset(); } } } -- 2.36.6