ec03529f7682febf6a5d264b10e6454f53166c93
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / AsyncNotifyingListenableFutureTask.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.util.concurrent.ExecutionList;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListenableFutureTask;
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.FutureTask;
19 import javax.annotation.Nonnull;
20 import javax.annotation.Nullable;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 /**
25  * A {@link FutureTask} that also implements the {@link ListenableFuture} interface similar to
26  * guava's {@link ListenableFutureTask}. This class differs from ListenableFutureTask in that it
27  * allows an {@link Executor} to be specified on construction that is used to execute listener
28  * callback Runnables, registered via {@link #addListener}, asynchronously when this task completes.
29  * This is useful when you want to guarantee listener executions are off-loaded onto another thread
30  * to avoid blocking the thread that completed this task, as a common use case is to pass an
31  * executor that runs tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor)
32  * to {@link #addListener}.
33  *
34  * <p>Note: the Executor specified on construction does not replace the Executor specified in
35  * {@link #addListener}. The latter Executor is still used however, if it is detected that the
36  * listener Runnable would execute in the thread that completed this task, the listener
37  * is executed on Executor specified on construction.
38  *
39  * <p>Also note that the use of this task may attach some (small) amount of state to the threads
40  * interacting with it. That state will not be detached automatically, but you can use
41  *  {@link #cleanStateForCurrentThread()} to clean it up.
42  *
43  * @author Thomas Pantelis
44  * @author Robert Varga
45  *
46  * @param <V> the Future result value type
47  */
48 public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
49
50     private static final class DelegatingAsyncNotifyingListenableFutureTask<V>
51             extends AsyncNotifyingListenableFutureTask<V> {
52
53         /**
54          * The executor used to run listener callbacks.
55          */
56         private final Executor listenerExecutor;
57
58         private DelegatingAsyncNotifyingListenableFutureTask(final Callable<V> callable,
59                 @Nullable final Executor listenerExecutor) {
60             super(callable);
61             this.listenerExecutor = requireNonNull(listenerExecutor);
62         }
63
64         private DelegatingAsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result,
65                 @Nullable final Executor listenerExecutor) {
66             super(runnable, result);
67             this.listenerExecutor = requireNonNull(listenerExecutor);
68         }
69
70         @Override
71         public void addListener(@Nonnull final Runnable listener, @Nonnull final Executor executor) {
72             // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one that
73             // runs tasks in the same thread as the caller submitting the task
74             // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and the
75             // listener is executed from the #done method, then the DelegatingRunnable will detect this
76             // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
77             //
78             // On the other hand, if this task is already complete, the call to ExecutionList#add in
79             // superclass will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
80             // the DelegatingRunnable will run the listener Runnable inline.
81             super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor);
82         }
83     }
84
85     private static final class DelegatingRunnable implements Runnable {
86         private final Runnable delegate;
87         private final Executor executor;
88
89         DelegatingRunnable(final Runnable delegate, final Executor executor) {
90             this.delegate = requireNonNull(delegate);
91             this.executor = requireNonNull(executor);
92         }
93
94         @Override
95         public void run() {
96             if (ON_TASK_COMPLETION_THREAD_TL.get().isSet()) {
97                 // We're running on the task completion thread so off-load to the executor.
98                 LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}",
99                         Thread.currentThread().getName(), executor);
100                 executor.execute(delegate);
101             } else {
102                 // We're not running on the task completion thread so run the delegate inline.
103                 LOG.trace("Executing ListenenableFuture Runnable on this thread: {}",
104                         Thread.currentThread().getName());
105                 delegate.run();
106             }
107         }
108     }
109
110     private static final Logger LOG = LoggerFactory.getLogger(AsyncNotifyingListenableFutureTask.class);
111
112     /**
113      * ThreadLocal used to detect if the task completion thread is running the listeners.
114      */
115     private static final SettableBooleanThreadLocal ON_TASK_COMPLETION_THREAD_TL = new SettableBooleanThreadLocal();
116
117     /**
118      *  The execution list to hold our listeners.
119      */
120     private final ExecutionList executionList = new ExecutionList();
121
122     private AsyncNotifyingListenableFutureTask(final Callable<V> callable) {
123         super(callable);
124     }
125
126     private AsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result) {
127         super(runnable, result);
128     }
129
130     /**
131      * Creates an {@code AsyncListenableFutureTask} that will upon running, execute the given
132      * {@code Callable}.
133      *
134      * @param callable the callable task
135      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
136      *                         If null, no executor is used.
137      */
138     public static <V> AsyncNotifyingListenableFutureTask<V> create(final Callable<V> callable,
139             @Nullable final Executor listenerExecutor) {
140         if (listenerExecutor == null) {
141             return new AsyncNotifyingListenableFutureTask<>(callable);
142         }
143         return new DelegatingAsyncNotifyingListenableFutureTask<>(callable, listenerExecutor);
144     }
145
146     /**
147      * Creates a {@code AsyncListenableFutureTask} that will upon running, execute the
148      * given {@code Runnable}, and arrange that {@code get} will return the
149      * given result on successful completion.
150      *
151      * @param runnable the runnable task
152      * @param result the result to return on successful completion.
153      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
154      *                         If null, no executor is used.
155      */
156     public static <V> AsyncNotifyingListenableFutureTask<V> create(final Runnable runnable, @Nullable final V result,
157             @Nullable final Executor listenerExecutor) {
158         if (listenerExecutor == null) {
159             return new AsyncNotifyingListenableFutureTask<>(runnable, result);
160         }
161         return new DelegatingAsyncNotifyingListenableFutureTask<>(runnable, result, listenerExecutor);
162     }
163
164     @Override
165     public void addListener(@Nonnull final Runnable listener, @Nonnull final Executor executor) {
166         executionList.add(listener, executor);
167     }
168
169     /**
170      * Remove the state which may have attached to the calling thread. If no state
171      * was attached this method does nothing.
172      */
173     public static void cleanStateForCurrentThread() {
174         ON_TASK_COMPLETION_THREAD_TL.remove();
175     }
176
177     /**
178      * Called by the base class when the future result is set. We invoke our listeners.
179      */
180     @Override
181     protected void done() {
182         final SettableBoolean b = ON_TASK_COMPLETION_THREAD_TL.get();
183         b.set();
184
185         try {
186             executionList.execute();
187         } finally {
188             b.reset();
189         }
190     }
191 }