package org.opendaylight.yangtools.util.concurrent;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.ExecutionList;
-import com.google.common.util.concurrent.ListenableFuture;
-
/**
* A {@link FutureTask} that also implements the {@link ListenableFuture} interface similar to
* guava's {@link ListenableFutureTask}. This class differs from ListenableFutureTask in that it
* listener Runnable would execute in the thread that completed this task, the listener
* is executed on Executor specified on construction.
*
+ * Also note that the use of this task may attach some (small) amount of state to the threads
+ * interacting with it. That state will not be detached automatically, but you can use
+ * {@link #cleanStateForCurrentThread()} to clean it up.
+ *
* @author Thomas Pantelis
+ * @author Robert Varga
*
* @param <V> the Future result value type
*/
public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
+ private static final class DelegatingAsyncNotifyingListenableFutureTask<V> extends AsyncNotifyingListenableFutureTask<V> {
+ /**
+ * The executor used to run listener callbacks.
+ */
+ private final Executor listenerExecutor;
+
+ private DelegatingAsyncNotifyingListenableFutureTask(final Callable<V> callable, @Nullable final Executor listenerExecutor) {
+ super(callable);
+ this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
+ }
+
+ private DelegatingAsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result,
+ @Nullable final Executor listenerExecutor) {
+ super(runnable, result);
+ this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
+ }
+
+ @Override
+ public void addListener(final Runnable listener, final Executor executor) {
+ // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one that
+ // runs tasks in the same thread as the caller submitting the task
+ // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and the
+ // listener is executed from the #done method, then the DelegatingRunnable will detect this
+ // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
+ //
+ // On the other hand, if this task is already complete, the call to ExecutionList#add in
+ // superclass will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
+ // the DelegatingRunnable will run the listener Runnable inline.
+ super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor);
+ }
+ }
- private static final Logger LOG = LoggerFactory.getLogger( AsyncNotifyingListenableFutureTask.class );
+ private static final class DelegatingRunnable implements Runnable {
+ private final Runnable delegate;
+ private final Executor executor;
+
+ DelegatingRunnable(final Runnable delegate, final Executor executor) {
+ this.delegate = Preconditions.checkNotNull(delegate);
+ this.executor = Preconditions.checkNotNull(executor);
+ }
+
+ @Override
+ public void run() {
+ if (ON_TASK_COMPLETION_THREAD_TL.get().isSet()) {
+ // We're running on the task completion thread so off-load to the executor.
+ LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}",
+ Thread.currentThread().getName(), executor);
+ executor.execute(delegate);
+ } else {
+ // We're not running on the task completion thread so run the delegate inline.
+ LOG.trace("Executing ListenenableFuture Runnable on this thread: {}",
+ Thread.currentThread().getName());
+ delegate.run();
+ }
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncNotifyingListenableFutureTask.class);
/**
* ThreadLocal used to detect if the task completion thread is running the listeners.
*/
- private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<>();
+ private static final SettableBooleanThreadLocal ON_TASK_COMPLETION_THREAD_TL = new SettableBooleanThreadLocal();
/**
* The execution list to hold our listeners.
*/
private final ExecutionList executionList = new ExecutionList();
- /**
- * The executor used to run listener callbacks.
- */
- private final Executor listenerExecutor;
-
- private AsyncNotifyingListenableFutureTask( Callable<V> callable, @Nullable Executor listenerExecutor ) {
- super( callable );
- this.listenerExecutor = listenerExecutor;
+ private AsyncNotifyingListenableFutureTask(final Callable<V> callable) {
+ super(callable);
}
- private AsyncNotifyingListenableFutureTask( Runnable runnable, @Nullable V result,
- @Nullable Executor listenerExecutor ) {
- super( runnable, result );
- this.listenerExecutor = listenerExecutor;
+ private AsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result) {
+ super(runnable, result);
}
/**
* @param listenerExecutor the executor used to run listener callbacks asynchronously.
* If null, no executor is used.
*/
- public static <V> AsyncNotifyingListenableFutureTask<V> create( Callable<V> callable,
- @Nullable Executor listenerExecutor ) {
- return new AsyncNotifyingListenableFutureTask<V>( callable, listenerExecutor );
+ public static <V> AsyncNotifyingListenableFutureTask<V> create(final Callable<V> callable,
+ @Nullable final Executor listenerExecutor) {
+ if (listenerExecutor != null) {
+ return new DelegatingAsyncNotifyingListenableFutureTask<V>(callable, listenerExecutor);
+ } else {
+ return new AsyncNotifyingListenableFutureTask<V>(callable);
+ }
}
/**
* @param listenerExecutor the executor used to run listener callbacks asynchronously.
* If null, no executor is used.
*/
- public static <V> AsyncNotifyingListenableFutureTask<V> create( Runnable runnable, @Nullable V result,
- @Nullable Executor listenerExecutor ) {
- return new AsyncNotifyingListenableFutureTask<V>( runnable, result, listenerExecutor );
+ public static <V> AsyncNotifyingListenableFutureTask<V> create(final Runnable runnable, @Nullable final V result,
+ @Nullable final Executor listenerExecutor) {
+ if (listenerExecutor != null) {
+ return new DelegatingAsyncNotifyingListenableFutureTask<V>(runnable, result, listenerExecutor);
+ } else {
+ return new AsyncNotifyingListenableFutureTask<V>(runnable, result);
+ }
}
@Override
- public void addListener( Runnable listener, Executor executor ) {
- // If a listenerExecutor was specified on construction, wrap the listener Runnable in a
- // DelegatingRunnable. If the specified executor is one that runs tasks in the same thread
- // as the caller submitting the task (eg MoreExecutors#sameThreadExecutor) and the
- // listener is executed from the #done method, then the DelegatingRunnable will detect this
- // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
- //
- // On the other hand, if this task is already complete, the call to ExecutionList#add below
- // will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
- // the DelegatingRunnable will run the listener Runnable inline.
-
- executionList.add( listenerExecutor == null ? listener :
- new DelegatingRunnable( listener, listenerExecutor ), executor );
+ public void addListener(@Nonnull final Runnable listener, final Executor executor) {
+ executionList.add(listener, executor);
+ }
+
+ /**
+ * Remove the state which may have attached to the calling thread. If no state
+ * was attached this method does nothing.
+ */
+ public static void cleanStateForCurrentThread() {
+ ON_TASK_COMPLETION_THREAD_TL.remove();
}
/**
*/
@Override
protected void done() {
- ON_TASK_COMPLETION_THREAD_TL.set( Boolean.TRUE );
+ final SettableBoolean b = ON_TASK_COMPLETION_THREAD_TL.get();
+ b.set();
+
try {
executionList.execute();
} finally {
- ON_TASK_COMPLETION_THREAD_TL.remove();
- }
- }
-
- private static class DelegatingRunnable implements Runnable {
-
- private final Runnable delegate;
- private final Executor executor;
-
- DelegatingRunnable( Runnable delegate, Executor executor ) {
- this.delegate = delegate;
- this.executor = executor;
- }
-
- @Override
- public void run() {
- if( ON_TASK_COMPLETION_THREAD_TL.get() == null ) {
- // We're not running on the task completion thread so run the delegate inline.
- LOG.trace( "Executing ListenenableFuture Runnable on this thread: {}",
- Thread.currentThread().getName() );
- delegate.run();
- } else {
- // We're running on the task completion thread so off-load to the executor.
- LOG.trace( "Submitting ListenenableFuture Runnable to the listenerExecutor",
- Thread.currentThread().getName() );
- executor.execute( delegate );
- }
+ b.reset();
}
}
}