69c94f32a35cfc1e7b1ec9bf71e6287bd88b7acc
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / AsyncNotifyingListenableFutureTask.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import java.util.concurrent.Callable;
12 import java.util.concurrent.Executor;
13 import java.util.concurrent.FutureTask;
14
15 import javax.annotation.Nullable;
16
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 import com.google.common.util.concurrent.ExecutionList;
21 import com.google.common.util.concurrent.ListenableFuture;
22
23 /**
24  * A {@link FutureTask} that also implements the {@link ListenableFuture} interface similar to
25  * guava's {@link ListenableFutureTask}. This class differs from ListenableFutureTask in that it
26  * allows an {@link Executor} to be specified on construction that is used to execute listener
27  * callback Runnables, registered via {@link #addListener}, asynchronously when this task completes.
28  * This is useful when you want to guarantee listener executions are off-loaded onto another thread
29  * to avoid blocking the thread that completed this task, as a common use case is to pass an
30  * executor that runs tasks in the same thread as the caller (ie MoreExecutors#sameThreadExecutor)
31  * to {@link #addListener}.
32  * <p>
33  * Note: the Executor specified on construction does not replace the Executor specified in
34  * {@link #addListener}. The latter Executor is still used however, if it is detected that the
35  * listener Runnable would execute in the thread that completed this task, the listener
36  * is executed on Executor specified on construction.
37  *
38  * @author Thomas Pantelis
39  *
40  * @param <V> the Future result value type
41  */
42 public class AsyncNotifyingListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
43
44     private static final Logger LOG = LoggerFactory.getLogger( AsyncNotifyingListenableFutureTask.class );
45
46     /**
47      * ThreadLocal used to detect if the task completion thread is running the listeners.
48      */
49     private static final ThreadLocal<Boolean> ON_TASK_COMPLETION_THREAD_TL = new ThreadLocal<>();
50
51     /**
52      *  The execution list to hold our listeners.
53      */
54     private final ExecutionList executionList = new ExecutionList();
55
56     /**
57      * The executor used to run listener callbacks.
58      */
59     private final Executor listenerExecutor;
60
61     private AsyncNotifyingListenableFutureTask( Callable<V> callable, @Nullable Executor listenerExecutor ) {
62         super( callable );
63         this.listenerExecutor = listenerExecutor;
64     }
65
66     private AsyncNotifyingListenableFutureTask( Runnable runnable, @Nullable V result,
67             @Nullable Executor listenerExecutor ) {
68         super( runnable, result );
69         this.listenerExecutor = listenerExecutor;
70     }
71
72     /**
73      * Creates an {@code AsyncListenableFutureTask} that will upon running, execute the given
74      * {@code Callable}.
75      *
76      * @param callable the callable task
77      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
78      *                         If null, no executor is used.
79      */
80     public static <V> AsyncNotifyingListenableFutureTask<V> create( Callable<V> callable,
81             @Nullable Executor listenerExecutor ) {
82       return new AsyncNotifyingListenableFutureTask<V>( callable, listenerExecutor );
83     }
84
85     /**
86      * Creates a {@code AsyncListenableFutureTask} that will upon running, execute the
87      * given {@code Runnable}, and arrange that {@code get} will return the
88      * given result on successful completion.
89      *
90      * @param runnable the runnable task
91      * @param result the result to return on successful completion.
92      * @param listenerExecutor the executor used to run listener callbacks asynchronously.
93      *                         If null, no executor is used.
94      */
95     public static <V> AsyncNotifyingListenableFutureTask<V> create( Runnable runnable, @Nullable V result,
96             @Nullable Executor listenerExecutor ) {
97       return new AsyncNotifyingListenableFutureTask<V>( runnable, result, listenerExecutor );
98     }
99
100     @Override
101     public void addListener( Runnable listener, Executor executor ) {
102         // If a listenerExecutor was specified on construction, wrap the listener Runnable in a
103         // DelegatingRunnable. If the specified executor is one that runs tasks in the same thread
104         // as the caller submitting the task (eg MoreExecutors#sameThreadExecutor) and the
105         // listener is executed from the #done method, then the DelegatingRunnable will detect this
106         // via the ThreadLocal and submit the listener Runnable to the listenerExecutor.
107         //
108         // On the other hand, if this task is already complete, the call to ExecutionList#add below
109         // will execute the listener Runnable immediately and, since the ThreadLocal won't be set,
110         // the DelegatingRunnable will run the listener Runnable inline.
111
112         executionList.add( listenerExecutor == null ? listener :
113             new DelegatingRunnable( listener, listenerExecutor ), executor );
114     }
115
116     /**
117      * Called by the base class when the future result is set. We invoke our listeners.
118      */
119     @Override
120     protected void done() {
121         ON_TASK_COMPLETION_THREAD_TL.set( Boolean.TRUE );
122         try {
123             executionList.execute();
124         } finally {
125             ON_TASK_COMPLETION_THREAD_TL.remove();
126         }
127     }
128
129     private static class DelegatingRunnable implements Runnable {
130
131         private final Runnable delegate;
132         private final Executor executor;
133
134         DelegatingRunnable( Runnable delegate, Executor executor ) {
135             this.delegate = delegate;
136             this.executor = executor;
137         }
138
139         @Override
140         public void run() {
141             if( ON_TASK_COMPLETION_THREAD_TL.get() == null ) {
142                 // We're not running on the task completion thread so run the delegate inline.
143                 LOG.trace( "Executing ListenenableFuture Runnable on this thread: {}",
144                         Thread.currentThread().getName() );
145                 delegate.run();
146             } else {
147                 // We're running on the task completion thread so off-load to the executor.
148                 LOG.trace( "Submitting ListenenableFuture Runnable to the listenerExecutor",
149                         Thread.currentThread().getName() );
150                 executor.execute( delegate );
151             }
152         }
153     }
154 }