BUG-1787: cleanup AsyncNotifyingListenableFutureTask
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / AsyncNotifyingListenableFutureTask.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ExecutionList;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListenableFutureTask;
15
16 import java.util.concurrent.Callable;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.FutureTask;
19
20 import javax.annotation.Nonnull;
21 import javax.annotation.Nullable;
22
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * A {@link FutureTask} that also implements the {@link ListenableFuture} interface similar to
28  * guava's {@link ListenableFutureTask}. This class differs from ListenableFutureTask in that it
29  * allows an {@link Executor} to be specified on construction that is used to execute listener
30  * callback Runnables, registered via {@link #addListener}, asynchronously when this task completes.
31  * This is useful when you want to guarantee listener executions are off-loaded onto another thread
32  * to avoid blocking the thread that completed this task, as a common use case is to pass an
33  * executor that runs tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor)
34  * to {@link #addListener}.
35  * <p>
36  * Note: the Executor specified on construction does not replace the Executor specified in
37  * {@link #addListener}. The latter Executor is still used however, if it is detected that the
38  * listener Runnable would execute in the thread that completed this task, the listener
39  * is executed on Executor specified on construction.
40  *
41  * Also note that the use of this task may attach some (small) amount of state to the threads
42  * interacting with it. That state will not be detached automatically, but you can use
43  *  {@link #cleanStateForCurrentThread()} to clean it up.
44  *
45  * @author Thomas Pantelis
46  * @author Robert Varga
47  *
48  * @param <V> the Future result value type
49  */
50 public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
51     private static final class DelegatingAsyncNotifyingListenableFutureTask<V> extends AsyncNotifyingListenableFutureTask<V> {
52         /**
53          * The executor used to run listener callbacks.
54          */
55         private final Executor listenerExecutor;
56
57         private DelegatingAsyncNotifyingListenableFutureTask(final Callable<V> callable, @Nullable final Executor listenerExecutor) {
58             super(callable);
59             this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
60         }
61
62         private DelegatingAsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result,
63                 @Nullable final Executor listenerExecutor) {
64             super(runnable, result);
65             this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
66         }
67
68         @Override
69         public void addListener(final Runnable listener, final Executor executor) {
70             // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one that
71             // runs tasks in the same thread as the caller submitting the task
72             // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and the
73             // listener is executed from the #done method, then the DelegatingRunnable will detect this
74             // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
75             //
76             // On the other hand, if this task is already complete, the call to ExecutionList#add in
77             // superclass will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
78             // the DelegatingRunnable will run the listener Runnable inline.
79             super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor);
80         }
81     }
82
83     private static final class DelegatingRunnable implements Runnable {
84         private final Runnable delegate;
85         private final Executor executor;
86
87         DelegatingRunnable(final Runnable delegate, final Executor executor) {
88             this.delegate = Preconditions.checkNotNull(delegate);
89             this.executor = Preconditions.checkNotNull(executor);
90         }
91
92         @Override
93         public void run() {
94             if (ON_TASK_COMPLETION_THREAD_TL.get().isSet()) {
95                 // We're running on the task completion thread so off-load to the executor.
96                 LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}",
97                         Thread.currentThread().getName(), executor);
98                 executor.execute(delegate);
99             } else {
100                 // We're not running on the task completion thread so run the delegate inline.
101                 LOG.trace("Executing ListenenableFuture Runnable on this thread: {}",
102                         Thread.currentThread().getName());
103                 delegate.run();
104             }
105         }
106     }
107
108     private static final Logger LOG = LoggerFactory.getLogger(AsyncNotifyingListenableFutureTask.class);
109
110     /**
111      * ThreadLocal used to detect if the task completion thread is running the listeners.
112      */
113     private static final SettableBooleanThreadLocal ON_TASK_COMPLETION_THREAD_TL = new SettableBooleanThreadLocal();
114
115     /**
116      *  The execution list to hold our listeners.
117      */
118     private final ExecutionList executionList = new ExecutionList();
119
120     private AsyncNotifyingListenableFutureTask(final Callable<V> callable) {
121         super(callable);
122     }
123
124     private AsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result) {
125         super(runnable, result);
126     }
127
128     /**
129      * Creates an {@code AsyncListenableFutureTask} that will upon running, execute the given
130      * {@code Callable}.
131      *
132      * @param callable the callable task
133      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
134      *                         If null, no executor is used.
135      */
136     public static <V> AsyncNotifyingListenableFutureTask<V> create(final Callable<V> callable,
137             @Nullable final Executor listenerExecutor) {
138         if (listenerExecutor != null) {
139             return new DelegatingAsyncNotifyingListenableFutureTask<V>(callable, listenerExecutor);
140         } else {
141             return new AsyncNotifyingListenableFutureTask<V>(callable);
142         }
143     }
144
145     /**
146      * Creates a {@code AsyncListenableFutureTask} that will upon running, execute the
147      * given {@code Runnable}, and arrange that {@code get} will return the
148      * given result on successful completion.
149      *
150      * @param runnable the runnable task
151      * @param result the result to return on successful completion.
152      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
153      *                         If null, no executor is used.
154      */
155     public static <V> AsyncNotifyingListenableFutureTask<V> create(final Runnable runnable, @Nullable final V result,
156             @Nullable final Executor listenerExecutor) {
157         if (listenerExecutor != null) {
158             return new DelegatingAsyncNotifyingListenableFutureTask<V>(runnable, result, listenerExecutor);
159         } else {
160             return new AsyncNotifyingListenableFutureTask<V>(runnable, result);
161         }
162     }
163
164     @Override
165     public void addListener(@Nonnull final Runnable listener, final Executor executor) {
166         executionList.add(listener, executor);
167     }
168
169     /**
170      * Remove the state which may have attached to the calling thread. If no state
171      * was attached this method does nothing.
172      */
173     public static void cleanStateForCurrentThread() {
174         ON_TASK_COMPLETION_THREAD_TL.remove();
175     }
176
177     /**
178      * Called by the base class when the future result is set. We invoke our listeners.
179      */
180     @Override
181     protected void done() {
182         final SettableBoolean b = ON_TASK_COMPLETION_THREAD_TL.get();
183         b.set();
184
185         try {
186             executionList.execute();
187         } finally {
188             b.reset();
189         }
190     }
191 }