8cdbc2c41373827e5f26baab4fe544f2e87d7f9d
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / DeadlockDetectingListeningExecutorService.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Supplier;
13 import com.google.common.util.concurrent.ForwardingListenableFuture;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ExecutorService;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23
24 /**
25  * An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that
26  * could occur if clients invoke the returned Future's <code>get</code> methods synchronously.
27  *
28  * <p>Deadlock scenarios are most apt to occur with a backing single-threaded executor where setting of
29  * the Future's result is executed on the single thread. Here's a scenario:
30  * <ul>
31  * <li>Client code is currently executing in an executor's single thread.</li>
32  * <li>The client submits another task to the same executor.</li>
33  * <li>The client calls <code>get()</code> synchronously on the returned Future</li>
34  * </ul>
35  * The second submitted task will never execute since the single thread is currently executing
36  * the client code which is blocked waiting for the submitted task to complete. Thus, deadlock has
37  * occurred.
38  *
39  * <p>This class prevents this scenario via the use of a ThreadLocal variable. When a task is invoked,
40  * the ThreadLocal is set and, when a task completes, the ThreadLocal is cleared. Futures returned
41  * from this class override the <code>get</code> methods to check if the ThreadLocal is set. If it is,
42  * an ExecutionException is thrown with a custom cause.
43  *
44  * <p>Note that the ThreadLocal is not removed automatically, so some state may be left hanging off of
45  * threads which have encountered this class. If you need to clean that state up, use
46  * {@link #cleanStateForCurrentThread()}.
47  *
48  * @author Thomas Pantelis
49  * @author Robert Varga
50  */
51 public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingListeningExecutorService {
52     /*
53      * We cannot use a static field simply because our API contract allows nesting, which means some
54      * tasks may be submitted to underlay and some to overlay service -- and the two cases need to
55      * be discerned reliably.
56      */
57     private final SettableBooleanThreadLocal deadlockDetector = new SettableBooleanThreadLocal();
58     private final Supplier<Exception> deadlockExceptionFunction;
59
60     /**
61      * Constructor.
62      *
63      * @param delegate the backing ExecutorService.
64      * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the
65      *             cause of the ExecutionException when a deadlock is detected.
66      */
67     public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
68             @Nonnull final Supplier<Exception> deadlockExceptionSupplier) {
69         this(delegate, deadlockExceptionSupplier, null);
70     }
71
72     /**
73      * Constructor.
74      *
75      * @param delegate the backing ExecutorService.
76      * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the
77      *             cause of the ExecutionException when a deadlock is detected.
78      * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously.
79      *             If null, no executor is used.
80      */
81     public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
82             @Nonnull final Supplier<Exception> deadlockExceptionSupplier,
83             @Nullable final Executor listenableFutureExecutor ) {
84         super(delegate, listenableFutureExecutor);
85         this.deadlockExceptionFunction = Preconditions.checkNotNull(deadlockExceptionSupplier);
86     }
87
88     @Override
89     public void execute(final Runnable command) {
90         getDelegate().execute(wrapRunnable(command));
91     }
92
93     @Override
94     public <T> ListenableFuture<T> submit(final Callable<T> task) {
95         return wrapListenableFuture(super.submit(wrapCallable(task)));
96     }
97
98     @Override
99     public ListenableFuture<?> submit(final Runnable task) {
100         return wrapListenableFuture(super.submit(wrapRunnable(task)));
101     }
102
103     @Override
104     public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
105         return wrapListenableFuture(super.submit(wrapRunnable(task), result));
106     }
107
108     /**
109      * Remove the state this instance may have attached to the calling thread. If no state
110      * was attached this method does nothing.
111      */
112     public void cleanStateForCurrentThread() {
113         deadlockDetector.remove();
114     }
115
116     private SettableBoolean primeDetector() {
117         final SettableBoolean b = deadlockDetector.get();
118         Preconditions.checkState(!b.isSet(), "Detector for {} has already been primed", this);
119         b.set();
120         return b;
121     }
122
123     private Runnable wrapRunnable(final Runnable task) {
124         return new Runnable() {
125             @Override
126             public void run() {
127                 final SettableBoolean b = primeDetector();
128                 try {
129                     task.run();
130                 } finally {
131                     b.reset();
132                 }
133             }
134         };
135     }
136
137     private <T> Callable<T> wrapCallable(final Callable<T> delagate) {
138         return new Callable<T>() {
139             @Override
140             public T call() throws Exception {
141                 final SettableBoolean b = primeDetector();
142                 try {
143                     return delagate.call();
144                 } finally {
145                     b.reset();
146                 }
147             }
148         };
149     }
150
151     private <T> ListenableFuture<T> wrapListenableFuture(final ListenableFuture<T> delegate) {
152         /*
153          * This creates a forwarding Future that overrides calls to get(...) to check, via the
154          * ThreadLocal, if the caller is doing a blocking call on a thread from this executor. If
155          * so, we detect this as a deadlock and throw an ExecutionException even though it may not
156          * be a deadlock if there are more than 1 thread in the pool. Either way, there's bad
157          * practice somewhere, either on the client side for doing a blocking call or in the
158          * framework's threading model.
159          */
160         return new ForwardingListenableFuture.SimpleForwardingListenableFuture<T>(delegate) {
161             @Override
162             public T get() throws InterruptedException, ExecutionException {
163                 checkDeadLockDetectorTL();
164                 return super.get();
165             }
166
167             @Override
168             public T get(final long timeout, final TimeUnit unit)
169                     throws InterruptedException, ExecutionException, TimeoutException {
170                 checkDeadLockDetectorTL();
171                 return super.get(timeout, unit);
172             }
173
174             void checkDeadLockDetectorTL() throws ExecutionException {
175                 if (deadlockDetector.get().isSet()) {
176                     throw new ExecutionException("A potential deadlock was detected.",
177                             deadlockExceptionFunction.get());
178                 }
179             }
180         };
181     }
182 }