bd0919aecfc56e6d87f94046339df589a7b0c52b
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / AsyncNotifyingListenableFutureTask.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ExecutionList;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.ListenableFutureTask;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.Executor;
17 import java.util.concurrent.FutureTask;
18 import javax.annotation.Nonnull;
19 import javax.annotation.Nullable;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 /**
24  * A {@link FutureTask} that also implements the {@link ListenableFuture} interface similar to
25  * guava's {@link ListenableFutureTask}. This class differs from ListenableFutureTask in that it
26  * allows an {@link Executor} to be specified on construction that is used to execute listener
27  * callback Runnables, registered via {@link #addListener}, asynchronously when this task completes.
28  * This is useful when you want to guarantee listener executions are off-loaded onto another thread
29  * to avoid blocking the thread that completed this task, as a common use case is to pass an
30  * executor that runs tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor)
31  * to {@link #addListener}.
32  * <p>
33  * Note: the Executor specified on construction does not replace the Executor specified in
34  * {@link #addListener}. The latter Executor is still used however, if it is detected that the
35  * listener Runnable would execute in the thread that completed this task, the listener
36  * is executed on Executor specified on construction.
37  *
38  * Also note that the use of this task may attach some (small) amount of state to the threads
39  * interacting with it. That state will not be detached automatically, but you can use
40  *  {@link #cleanStateForCurrentThread()} to clean it up.
41  *
42  * @author Thomas Pantelis
43  * @author Robert Varga
44  *
45  * @param <V> the Future result value type
46  */
47 public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
48     private static final class DelegatingAsyncNotifyingListenableFutureTask<V> extends AsyncNotifyingListenableFutureTask<V> {
49         /**
50          * The executor used to run listener callbacks.
51          */
52         private final Executor listenerExecutor;
53
54         private DelegatingAsyncNotifyingListenableFutureTask(final Callable<V> callable, @Nullable final Executor listenerExecutor) {
55             super(callable);
56             this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
57         }
58
59         private DelegatingAsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result,
60                 @Nullable final Executor listenerExecutor) {
61             super(runnable, result);
62             this.listenerExecutor = Preconditions.checkNotNull(listenerExecutor);
63         }
64
65         @Override
66         public void addListener(final Runnable listener, final Executor executor) {
67             // Wrap the listener Runnable in a DelegatingRunnable. If the specified executor is one that
68             // runs tasks in the same thread as the caller submitting the task
69             // (e.g. {@link com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor}) and the
70             // listener is executed from the #done method, then the DelegatingRunnable will detect this
71             // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
72             //
73             // On the other hand, if this task is already complete, the call to ExecutionList#add in
74             // superclass will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
75             // the DelegatingRunnable will run the listener Runnable inline.
76             super.addListener(new DelegatingRunnable(listener, listenerExecutor), executor);
77         }
78     }
79
80     private static final class DelegatingRunnable implements Runnable {
81         private final Runnable delegate;
82         private final Executor executor;
83
84         DelegatingRunnable(final Runnable delegate, final Executor executor) {
85             this.delegate = Preconditions.checkNotNull(delegate);
86             this.executor = Preconditions.checkNotNull(executor);
87         }
88
89         @Override
90         public void run() {
91             if (ON_TASK_COMPLETION_THREAD_TL.get().isSet()) {
92                 // We're running on the task completion thread so off-load to the executor.
93                 LOG.trace("Submitting ListenenableFuture Runnable from thread {} to executor {}",
94                         Thread.currentThread().getName(), executor);
95                 executor.execute(delegate);
96             } else {
97                 // We're not running on the task completion thread so run the delegate inline.
98                 LOG.trace("Executing ListenenableFuture Runnable on this thread: {}",
99                         Thread.currentThread().getName());
100                 delegate.run();
101             }
102         }
103     }
104
105     private static final Logger LOG = LoggerFactory.getLogger(AsyncNotifyingListenableFutureTask.class);
106
107     /**
108      * ThreadLocal used to detect if the task completion thread is running the listeners.
109      */
110     private static final SettableBooleanThreadLocal ON_TASK_COMPLETION_THREAD_TL = new SettableBooleanThreadLocal();
111
112     /**
113      *  The execution list to hold our listeners.
114      */
115     private final ExecutionList executionList = new ExecutionList();
116
117     private AsyncNotifyingListenableFutureTask(final Callable<V> callable) {
118         super(callable);
119     }
120
121     private AsyncNotifyingListenableFutureTask(final Runnable runnable, @Nullable final V result) {
122         super(runnable, result);
123     }
124
125     /**
126      * Creates an {@code AsyncListenableFutureTask} that will upon running, execute the given
127      * {@code Callable}.
128      *
129      * @param callable the callable task
130      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
131      *                         If null, no executor is used.
132      */
133     public static <V> AsyncNotifyingListenableFutureTask<V> create(final Callable<V> callable,
134             @Nullable final Executor listenerExecutor) {
135         if (listenerExecutor != null) {
136             return new DelegatingAsyncNotifyingListenableFutureTask<>(callable, listenerExecutor);
137         } else {
138             return new AsyncNotifyingListenableFutureTask<>(callable);
139         }
140     }
141
142     /**
143      * Creates a {@code AsyncListenableFutureTask} that will upon running, execute the
144      * given {@code Runnable}, and arrange that {@code get} will return the
145      * given result on successful completion.
146      *
147      * @param runnable the runnable task
148      * @param result the result to return on successful completion.
149      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
150      *                         If null, no executor is used.
151      */
152     public static <V> AsyncNotifyingListenableFutureTask<V> create(final Runnable runnable, @Nullable final V result,
153             @Nullable final Executor listenerExecutor) {
154         if (listenerExecutor != null) {
155             return new DelegatingAsyncNotifyingListenableFutureTask<>(runnable, result, listenerExecutor);
156         } else {
157             return new AsyncNotifyingListenableFutureTask<>(runnable, result);
158         }
159     }
160
161     @Override
162     public void addListener(@Nonnull final Runnable listener, final Executor executor) {
163         executionList.add(listener, executor);
164     }
165
166     /**
167      * Remove the state which may have attached to the calling thread. If no state
168      * was attached this method does nothing.
169      */
170     public static void cleanStateForCurrentThread() {
171         ON_TASK_COMPLETION_THREAD_TL.remove();
172     }
173
174     /**
175      * Called by the base class when the future result is set. We invoke our listeners.
176      */
177     @Override
178     protected void done() {
179         final SettableBoolean b = ON_TASK_COMPLETION_THREAD_TL.get();
180         b.set();
181
182         try {
183             executionList.execute();
184         } finally {
185             b.reset();
186         }
187     }
188 }