53629b033055c47f9d84018c240888e2226df7fd
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / AsyncNotifyingListenableFutureTask.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ExecutionList;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListenableFutureTask;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.Executor;
17 import java.util.concurrent.FutureTask;
18 import javax.annotation.Nonnull;
19 import javax.annotation.Nullable;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 /**
24  * A {@link FutureTask} that also implements the {@link ListenableFuture} interface similar to
25  * guava's {@link ListenableFutureTask}. This class differs from ListenableFutureTask in that it
26  * allows an {@link Executor} to be specified on construction that is used to execute listener
27  * callback Runnables, registered via {@link #addListener}, asynchronously when this task completes.
28  * This is useful when you want to guarantee listener executions are off-loaded onto another thread
29  * to avoid blocking the thread that completed this task, as a common use case is to pass an
30  * executor that runs tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor)
31  * to {@link #addListener}.
32  *
33  * <p>Note: the Executor specified on construction does not replace the Executor specified in
34  * {@link #addListener}. The latter Executor is still used however, if it is detected that the
35  * listener Runnable would execute in the thread that completed this task, the listener
36  * is executed on Executor specified on construction.
37  *
38  * <p>Also note that the use of this task may attach some (small) amount of state to the threads
39  * interacting with it. That state will not be detached automatically, but you can use
40  *  {@link #cleanStateForCurrentThread()} to clean it up.
41  *
42  * @author Thomas Pantelis
43  * @author Robert Varga
44  *
45  * @param <V> the Future result value type
46  */
47 public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
48
49     private static final class DelegatingAsyncNotifyingListenableFutureTask<V>
50             extends AsyncNotifyingListenableFutureTask<V> {
51
52         /**
53          * The executor used to run listener callbacks.
54          */
55         private final Executor listenerExecutor;
56
57         private DelegatingAsyncNotifyingListenableFutureTask(final Callable<V> callable,
58                 @Nullable final Executor listenerExecutor) {
59             super(callable);
60             this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
61         }
62
63         private DelegatingAsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result,
64                 @Nullable final Executor listenerExecutor) {
65             super(runnable, result);
66             this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
67         }
68
69         @Override
70         public void addListener(final Runnable listener, final Executor executor) {
71             // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one that
72             // runs tasks in the same thread as the caller submitting the task
73             // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and the
74             // listener is executed from the #done method, then the DelegatingRunnable will detect this
75             // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
76             //
77             // On the other hand, if this task is already complete, the call to ExecutionList#add in
78             // superclass will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
79             // the DelegatingRunnable will run the listener Runnable inline.
80             super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor);
81         }
82     }
83
84     private static final class DelegatingRunnable implements Runnable {
85         private final Runnable delegate;
86         private final Executor executor;
87
88         DelegatingRunnable(final Runnable delegate, final Executor executor) {
89             this.delegate = Preconditions.checkNotNull(delegate);
90             this.executor = Preconditions.checkNotNull(executor);
91         }
92
93         @Override
94         public void run() {
95             if (ON_TASK_COMPLETION_THREAD_TL.get().isSet()) {
96                 // We're running on the task completion thread so off-load to the executor.
97                 LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}",
98                         Thread.currentThread().getName(), executor);
99                 executor.execute(delegate);
100             } else {
101                 // We're not running on the task completion thread so run the delegate inline.
102                 LOG.trace("Executing ListenenableFuture Runnable on this thread: {}",
103                         Thread.currentThread().getName());
104                 delegate.run();
105             }
106         }
107     }
108
109     private static final Logger LOG = LoggerFactory.getLogger(AsyncNotifyingListenableFutureTask.class);
110
111     /**
112      * ThreadLocal used to detect if the task completion thread is running the listeners.
113      */
114     private static final SettableBooleanThreadLocal ON_TASK_COMPLETION_THREAD_TL = new SettableBooleanThreadLocal();
115
116     /**
117      *  The execution list to hold our listeners.
118      */
119     private final ExecutionList executionList = new ExecutionList();
120
121     private AsyncNotifyingListenableFutureTask(final Callable<V> callable) {
122         super(callable);
123     }
124
125     private AsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result) {
126         super(runnable, result);
127     }
128
129     /**
130      * Creates an {@code AsyncListenableFutureTask} that will upon running, execute the given
131      * {@code Callable}.
132      *
133      * @param callable the callable task
134      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
135      *                         If null, no executor is used.
136      */
137     public static <V> AsyncNotifyingListenableFutureTask<V> create(final Callable<V> callable,
138             @Nullable final Executor listenerExecutor) {
139         if (listenerExecutor == null) {
140             return new AsyncNotifyingListenableFutureTask<>(callable);
141         }
142         return new DelegatingAsyncNotifyingListenableFutureTask<>(callable, listenerExecutor);
143     }
144
145     /**
146      * Creates a {@code AsyncListenableFutureTask} that will upon running, execute the
147      * given {@code Runnable}, and arrange that {@code get} will return the
148      * given result on successful completion.
149      *
150      * @param runnable the runnable task
151      * @param result the result to return on successful completion.
152      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
153      *                         If null, no executor is used.
154      */
155     public static <V> AsyncNotifyingListenableFutureTask<V> create(final Runnable runnable, @Nullable final V result,
156             @Nullable final Executor listenerExecutor) {
157         if (listenerExecutor == null) {
158             return new AsyncNotifyingListenableFutureTask<>(runnable, result);
159         }
160         return new DelegatingAsyncNotifyingListenableFutureTask<>(runnable, result, listenerExecutor);
161     }
162
163     @Override
164     public void addListener(@Nonnull final Runnable listener, final Executor executor) {
165         executionList.add(listener, executor);
166     }
167
168     /**
169      * Remove the state which may have attached to the calling thread. If no state
170      * was attached this method does nothing.
171      */
172     public static void cleanStateForCurrentThread() {
173         ON_TASK_COMPLETION_THREAD_TL.remove();
174     }
175
176     /**
177      * Called by the base class when the future result is set. We invoke our listeners.
178      */
179     @Override
180     protected void done() {
181         final SettableBoolean b = ON_TASK_COMPLETION_THREAD_TL.get();
182         b.set();
183
184         try {
185             executionList.execute();
186         } finally {
187             b.reset();
188         }
189     }
190 }