BUG-1786: fixup DeadlockDetectingListeningExecutorService
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / DeadlockDetectingListeningExecutorService.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import com.google.common.base.Function;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Supplier;
14 import com.google.common.util.concurrent.ForwardingListenableFuture;
15 import com.google.common.util.concurrent.ListenableFuture;
16
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.ExecutorService;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23
24 import javax.annotation.Nonnull;
25 import javax.annotation.Nullable;
26
27 /**
28  * An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that
29  * could occur if clients invoke the returned Future's <ode>get</code> methods synchronously.
30  * <p>
31  * Deadlock scenarios are most apt to occur with a backing single-threaded executor where setting of
32  * the Future's result is executed on the single thread. Here's a scenario:
33  * <ul>
34  * <li>Client code is currently executing in an executor's single thread.</li>
35  * <li>The client submits another task to the same executor.</li>
36  * <li>The client calls <code>get()</code> synchronously on the returned Future</li>
37  * </ul>
38  * The second submitted task will never execute since the single thread is currently executing
39  * the client code which is blocked waiting for the submitted task to complete. Thus, deadlock has
40  * occurred.
41  * <p>
42  * This class prevents this scenario via the use of a ThreadLocal variable. When a task is invoked,
43  * the ThreadLocal is set and, when a task completes, the ThreadLocal is cleared. Futures returned
44  * from this class override the <code>get</code> methods to check if the ThreadLocal is set. If it is,
45  * an ExecutionException is thrown with a custom cause.
46  *
47  * Note that the ThreadLocal is not removed automatically, so some state may be left hanging off of
48  * threads which have encountered this class. If you need to clean that state up, use
49  * {@link #cleanStateForCurrentThread()}.
50  *
51  * @author Thomas Pantelis
52  * @author Robert Varga
53  */
54 public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingListeningExecutorService {
55     /*
56      * We cannot use a static field simply because our API contract allows nesting, which means some
57      * tasks may be submitted to underlay and some to overlay service -- and the two cases need to
58      * be discerned reliably.
59      */
60     private final SettableBooleanThreadLocal deadlockDetector = new SettableBooleanThreadLocal();
61     private final Supplier<Exception> deadlockExceptionFunction;
62
63     // Compatibility wrapper, needs to be removed once the deprecated constructors are gone.
64     private static final class CompatExceptionSupplier implements Supplier<Exception> {
65         private final Function<Void, Exception> function;
66
67         private CompatExceptionSupplier(final Function<Void, Exception> function) {
68             this.function = Preconditions.checkNotNull(function);
69         }
70
71         @Override
72         public Exception get() {
73             return function.apply(null);
74         }
75     }
76
77     /**
78      * Constructor.
79      *
80      * @param delegate the backing ExecutorService.
81      * @param deadlockExceptionFunction Function that returns an Exception instance to set as the
82      *             cause of the ExecutionException when a deadlock is detected.
83      * @deprecated Use {@link #DeadlockDetectingListeningExecutorService(ExecutorService, Supplier)} instead.
84      */
85     @Deprecated
86     public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
87             final Function<Void, Exception> deadlockExceptionFunction) {
88         this(delegate, deadlockExceptionFunction, null);
89     }
90
91     /**
92      * Constructor.
93      *
94      * @param delegate the backing ExecutorService.
95      * @param deadlockExceptionFunction Function that returns an Exception instance to set as the
96      *             cause of the ExecutionException when a deadlock is detected.
97      * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously.
98      *             If null, no executor is used.
99      * @deprecated Use {@link #DeadlockDetectingListeningExecutorService(ExecutorService, Supplier, Executor)} instead.
100      */
101     @Deprecated
102     public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
103             final Function<Void, Exception> deadlockExceptionFunction,
104             @Nullable final Executor listenableFutureExecutor) {
105         super(delegate, listenableFutureExecutor);
106         this.deadlockExceptionFunction = new CompatExceptionSupplier(deadlockExceptionFunction);
107     }
108
109     /**
110      * Constructor.
111      *
112      * @param delegate the backing ExecutorService.
113      * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the
114      *             cause of the ExecutionException when a deadlock is detected.
115      */
116     public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
117             @Nonnull final Supplier<Exception> deadlockExceptionSupplier) {
118         this(delegate, deadlockExceptionSupplier, null);
119     }
120
121     /**
122      * Constructor.
123      *
124      * @param delegate the backing ExecutorService.
125      * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the
126      *             cause of the ExecutionException when a deadlock is detected.
127      * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously.
128      *             If null, no executor is used.
129      */
130     public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
131             @Nonnull final Supplier<Exception> deadlockExceptionSupplier,
132             @Nullable final Executor listenableFutureExecutor ) {
133         super(delegate, listenableFutureExecutor);
134         this.deadlockExceptionFunction = Preconditions.checkNotNull(deadlockExceptionSupplier);
135     }
136
137     @Override
138     public void execute(final Runnable command) {
139         getDelegate().execute(wrapRunnable(command));
140     }
141
142     @Override
143     public <T> ListenableFuture<T> submit(final Callable<T> task) {
144         return wrapListenableFuture(super.submit(wrapCallable(task)));
145     }
146
147     @Override
148     public ListenableFuture<?> submit(final Runnable task) {
149         return wrapListenableFuture(super.submit(wrapRunnable(task)));
150     }
151
152     @Override
153     public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
154         return wrapListenableFuture(super.submit(wrapRunnable(task), result));
155     }
156
157     /**
158      * Remove the state this instance may have attached to the calling thread. If no state
159      * was attached this method does nothing.
160      */
161     public void cleanStateForCurrentThread() {
162         deadlockDetector.remove();
163     }
164
165     private SettableBoolean primeDetector() {
166         final SettableBoolean b = deadlockDetector.get();
167         Preconditions.checkState(!b.isSet(), "Detector for {} has already been primed", this);
168         b.set();
169         return b;
170     }
171
172     private Runnable wrapRunnable(final Runnable task) {
173         return new Runnable() {
174             @Override
175             public void run() {
176                 final SettableBoolean b = primeDetector();
177                 try {
178                     task.run();
179                 } finally {
180                     b.reset();
181                 }
182             }
183         };
184     }
185
186     private <T> Callable<T> wrapCallable(final Callable<T> delagate) {
187         return new Callable<T>() {
188             @Override
189             public T call() throws Exception {
190                 final SettableBoolean b = primeDetector();
191                 try {
192                     return delagate.call();
193                 } finally {
194                     b.reset();
195                 }
196             }
197         };
198     }
199
200     private <T> ListenableFuture<T> wrapListenableFuture(final ListenableFuture<T> delegate) {
201         /*
202          * This creates a forwarding Future that overrides calls to get(...) to check, via the
203          * ThreadLocal, if the caller is doing a blocking call on a thread from this executor. If
204          * so, we detect this as a deadlock and throw an ExecutionException even though it may not
205          * be a deadlock if there are more than 1 thread in the pool. Either way, there's bad
206          * practice somewhere, either on the client side for doing a blocking call or in the
207          * framework's threading model.
208          */
209         return new ForwardingListenableFuture.SimpleForwardingListenableFuture<T>(delegate) {
210             @Override
211             public T get() throws InterruptedException, ExecutionException {
212                 checkDeadLockDetectorTL();
213                 return super.get();
214             }
215
216             @Override
217             public T get(final long timeout, final TimeUnit unit)
218                     throws InterruptedException, ExecutionException, TimeoutException {
219                 checkDeadLockDetectorTL();
220                 return super.get(timeout, unit);
221             }
222
223             void checkDeadLockDetectorTL() throws ExecutionException {
224                 if (deadlockDetector.get().isSet()) {
225                     throw new ExecutionException("A potential deadlock was detected.",
226                             deadlockExceptionFunction.get());
227                 }
228             }
229         };
230     }
231 }