2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ExecutionList;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListenableFutureTask;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.Executor;
17 import java.util.concurrent.FutureTask;
18 import javax.annotation.Nonnull;
19 import javax.annotation.Nullable;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
24 * A {@link FutureTask} that also implements the {@link ListenableFuture} interface similar to
25 * guava's {@link ListenableFutureTask}. This class differs from ListenableFutureTask in that it
26 * allows an {@link Executor} to be specified on construction that is used to execute listener
27 * callback Runnables, registered via {@link #addListener}, asynchronously when this task completes.
28 * This is useful when you want to guarantee listener executions are off-loaded onto another thread
29 * to avoid blocking the thread that completed this task, as a common use case is to pass an
30 * executor that runs tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor)
31 * to {@link #addListener}.
33 * <p>Note: the Executor specified on construction does not replace the Executor specified in
34 * {@link #addListener}. The latter Executor is still used however, if it is detected that the
35 * listener Runnable would execute in the thread that completed this task, the listener
36 * is executed on Executor specified on construction.
38 * <p>Also note that the use of this task may attach some (small) amount of state to the threads
39 * interacting with it. That state will not be detached automatically, but you can use
40 * {@link #cleanStateForCurrentThread()} to clean it up.
42 * @author Thomas Pantelis
43 * @author Robert Varga
45 * @param <V> the Future result value type
47 public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
49 private static final class DelegatingAsyncNotifyingListenableFutureTask<V>
50 extends AsyncNotifyingListenableFutureTask<V> {
53 * The executor used to run listener callbacks.
55 private final Executor listenerExecutor;
57 private DelegatingAsyncNotifyingListenableFutureTask(final Callable<V> callable,
58 @Nullable final Executor listenerExecutor) {
60 this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
63 private DelegatingAsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result,
64 @Nullable final Executor listenerExecutor) {
65 super(runnable, result);
66 this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
70 public void addListener(final Runnable listener, final Executor executor) {
71 // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one that
72 // runs tasks in the same thread as the caller submitting the task
73 // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and the
74 // listener is executed from the #done method, then the DelegatingRunnable will detect this
75 // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
77 // On the other hand, if this task is already complete, the call to ExecutionList#add in
78 // superclass will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
79 // the DelegatingRunnable will run the listener Runnable inline.
80 super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor);
84 private static final class DelegatingRunnable implements Runnable {
85 private final Runnable delegate;
86 private final Executor executor;
88 DelegatingRunnable(final Runnable delegate, final Executor executor) {
89 this.delegate = Preconditions.checkNotNull(delegate);
90 this.executor = Preconditions.checkNotNull(executor);
95 if (ON_TASK_COMPLETION_THREAD_TL.get().isSet()) {
96 // We're running on the task completion thread so off-load to the executor.
97 LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}",
98 Thread.currentThread().getName(), executor);
99 executor.execute(delegate);
101 // We're not running on the task completion thread so run the delegate inline.
102 LOG.trace("Executing ListenenableFuture Runnable on this thread: {}",
103 Thread.currentThread().getName());
109 private static final Logger LOG = LoggerFactory.getLogger(AsyncNotifyingListenableFutureTask.class);
112 * ThreadLocal used to detect if the task completion thread is running the listeners.
114 private static final SettableBooleanThreadLocal ON_TASK_COMPLETION_THREAD_TL = new SettableBooleanThreadLocal();
117 * The execution list to hold our listeners.
119 private final ExecutionList executionList = new ExecutionList();
121 private AsyncNotifyingListenableFutureTask(final Callable<V> callable) {
125 private AsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result) {
126 super(runnable, result);
130 * Creates an {@code AsyncListenableFutureTask} that will upon running, execute the given
133 * @param callable the callable task
134 * @param listenerExecutor the executor used to run listener callbacks asynchronously.
135 * If null, no executor is used.
137 public static <V> AsyncNotifyingListenableFutureTask<V> create(final Callable<V> callable,
138 @Nullable final Executor listenerExecutor) {
139 if (listenerExecutor == null) {
140 return new AsyncNotifyingListenableFutureTask<>(callable);
142 return new DelegatingAsyncNotifyingListenableFutureTask<>(callable, listenerExecutor);
146 * Creates a {@code AsyncListenableFutureTask} that will upon running, execute the
147 * given {@code Runnable}, and arrange that {@code get} will return the
148 * given result on successful completion.
150 * @param runnable the runnable task
151 * @param result the result to return on successful completion.
152 * @param listenerExecutor the executor used to run listener callbacks asynchronously.
153 * If null, no executor is used.
155 public static <V> AsyncNotifyingListenableFutureTask<V> create(final Runnable runnable, @Nullable final V result,
156 @Nullable final Executor listenerExecutor) {
157 if (listenerExecutor == null) {
158 return new AsyncNotifyingListenableFutureTask<>(runnable, result);
160 return new DelegatingAsyncNotifyingListenableFutureTask<>(runnable, result, listenerExecutor);
164 public void addListener(@Nonnull final Runnable listener, final Executor executor) {
165 executionList.add(listener, executor);
169 * Remove the state which may have attached to the calling thread. If no state
170 * was attached this method does nothing.
172 public static void cleanStateForCurrentThread() {
173 ON_TASK_COMPLETION_THREAD_TL.remove();
177 * Called by the base class when the future result is set. We invoke our listeners.
180 protected void done() {
181 final SettableBoolean b = ON_TASK_COMPLETION_THREAD_TL.get();
185 executionList.execute();