Merge "Rework data.api.InstanceIdentifier to work on Iterables"
[yangtools.git] / common / util / src / main / java / org / opendaylight / yangtools / util / concurrent / DeadlockDetectingListeningExecutorService.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static com.google.common.base.Preconditions.checkNotNull;
12
13 import com.google.common.base.Function;
14 import com.google.common.util.concurrent.AbstractListeningExecutorService;
15 import com.google.common.util.concurrent.ForwardingListenableFuture;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.ListenableFutureTask;
18
19 import java.util.List;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25
26 /**
27  * An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that
28  * could occur if clients invoke the returned Future's <ode>get</code> methods synchronously.
29  * <p>
30  * Deadlock scenarios are most apt to occur with a backing single-threaded executor where setting of
31  * the Future's result is executed on the single thread. Here's a scenario:
32  * <ul>
33  * <li>Client code is currently executing in an executor's single thread.</li>
34  * <li>The client submits another task to the same executor.</li>
35  * <li>The client calls <code>get()</code> synchronously on the returned Future</li>
36  * </ul>
37  * The second submitted task will never execute since the single thread is currently executing
38  * the client code which is blocked waiting for the submitted task to complete. Thus, deadlock has
39  * occurred.
40  * <p>
41  * This class prevents this scenario via the use of a ThreadLocal variable. When a task is invoked,
42  * the ThreadLocal is set and, when a task completes, the ThreadLocal is cleared. Futures returned
43  * from this class override the <code>get</code> methods to check if the ThreadLocal is set. If it is,
44  * an ExecutionException is thrown with a custom cause.
45  *
46  * @author Thomas Pantelis
47  */
48 public class DeadlockDetectingListeningExecutorService extends AbstractListeningExecutorService {
49     private final ThreadLocal<Boolean> deadlockDetector = new ThreadLocal<>();
50     private final Function<Void, Exception> deadlockExceptionFunction;
51     private final ExecutorService delegate;
52
53     /**
54      * Constructor.
55      *
56      * @param delegate the backing ExecutorService.
57      * @param deadlockExceptionFunction Function that returns an Exception instance to set as the
58      *             cause of the ExecutionException when a deadlock is detected.
59      */
60     public DeadlockDetectingListeningExecutorService(final ExecutorService delegate,
61             final Function<Void,Exception> deadlockExceptionFunction) {
62         this.delegate = checkNotNull(delegate);
63         this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction);
64     }
65
66     @Override
67     public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
68         return delegate.awaitTermination(timeout, unit);
69     }
70
71     @Override
72     public boolean isShutdown() {
73         return delegate.isShutdown();
74     }
75
76     @Override
77     public boolean isTerminated() {
78         return delegate.isTerminated();
79     }
80
81     @Override
82     public void shutdown() {
83         delegate.shutdown();
84     }
85
86     @Override
87     public List<Runnable> shutdownNow() {
88         return delegate.shutdownNow();
89     }
90
91     @Override
92     public void execute(final Runnable command) {
93         delegate.execute(wrapRunnable(command));
94     }
95
96     @Override
97     public <T> ListenableFuture<T> submit(final Callable<T> task ) {
98         final ListenableFutureTask<T> futureTask = ListenableFutureTask.create(wrapCallable(task));
99         delegate.execute(futureTask);
100         return wrapListenableFuture(futureTask);
101     }
102
103     @Override
104     public ListenableFuture<?> submit( final Runnable task ) {
105         ListenableFutureTask<Void> futureTask = ListenableFutureTask.create(wrapRunnable(task), null);
106         delegate.execute(futureTask);
107         return wrapListenableFuture(futureTask);
108     }
109
110     @Override
111     public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
112         ListenableFutureTask<T> futureTask = ListenableFutureTask.create(wrapRunnable(task), result);
113         delegate.execute(futureTask);
114         return wrapListenableFuture(futureTask);
115     }
116
117     private Runnable wrapRunnable(final Runnable task) {
118         return new Runnable() {
119             @Override
120             public void run() {
121                 deadlockDetector.set(Boolean.TRUE);
122                 try {
123                     task.run();
124                 } finally {
125                     deadlockDetector.set(null);
126                 }
127             }
128         };
129     }
130
131     private <T> Callable<T> wrapCallable(final Callable<T> delagate) {
132         return new Callable<T>() {
133             @Override
134             public T call() throws Exception {
135                 deadlockDetector.set(Boolean.TRUE);
136                 try {
137                     return delagate.call();
138                 } finally {
139                     deadlockDetector.set(null);
140                 }
141             }
142         };
143     }
144
145     private <T> ListenableFuture<T> wrapListenableFuture(final ListenableFuture<T> delegate ) {
146         /*
147          *  This creates a forwarding Future that overrides calls to get(...) to check, via the ThreadLocal,
148          * if the caller is doing a blocking call on a thread from this executor. If so, we detect this as
149          * a deadlock and throw an ExecutionException even though it may not be a deadlock if there are
150          * more than 1 thread in the pool. Either way, there's bad practice somewhere, either on the client
151          * side for doing a blocking call or in the framework's threading model.
152          */
153         return new ForwardingListenableFuture.SimpleForwardingListenableFuture<T>(delegate) {
154             @Override
155             public T get() throws InterruptedException, ExecutionException {
156                 checkDeadLockDetectorTL();
157                 return super.get();
158             }
159
160             @Override
161             public T get(final long timeout, final TimeUnit unit)
162                     throws InterruptedException, ExecutionException, TimeoutException {
163                 checkDeadLockDetectorTL();
164                 return super.get(timeout, unit);
165             }
166
167             void checkDeadLockDetectorTL() throws ExecutionException {
168                 if (deadlockDetector.get() != null) {
169                     throw new ExecutionException("A potential deadlock was detected.",
170                             deadlockExceptionFunction.apply(null));
171                 }
172             }
173         };
174     }
175 }