/* * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.yangtools.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Function; import com.google.common.util.concurrent.ForwardingListenableFuture; import com.google.common.util.concurrent.ListenableFuture; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; /** * An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that * could occur if clients invoke the returned Future's get methods synchronously. *

* Deadlock scenarios are most apt to occur with a backing single-threaded executor where setting of * the Future's result is executed on the single thread. Here's a scenario: *

* The second submitted task will never execute since the single thread is currently executing * the client code which is blocked waiting for the submitted task to complete. Thus, deadlock has * occurred. *

* This class prevents this scenario via the use of a ThreadLocal variable. When a task is invoked, * the ThreadLocal is set and, when a task completes, the ThreadLocal is cleared. Futures returned * from this class override the get methods to check if the ThreadLocal is set. If it is, * an ExecutionException is thrown with a custom cause. * * @author Thomas Pantelis */ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingListeningExecutorService { private final ThreadLocal deadlockDetector = new ThreadLocal<>(); private final Function deadlockExceptionFunction; /** * Constructor. * * @param delegate the backing ExecutorService. * @param deadlockExceptionFunction Function that returns an Exception instance to set as the * cause of the ExecutionException when a deadlock is detected. */ public DeadlockDetectingListeningExecutorService( ExecutorService delegate, Function deadlockExceptionFunction ) { this(delegate, deadlockExceptionFunction, null); } /** * Constructor. * * @param delegate the backing ExecutorService. * @param deadlockExceptionFunction Function that returns an Exception instance to set as the * cause of the ExecutionException when a deadlock is detected. * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously. * If null, no executor is used. */ public DeadlockDetectingListeningExecutorService( ExecutorService delegate, Function deadlockExceptionFunction, @Nullable Executor listenableFutureExecutor ) { super(delegate, listenableFutureExecutor); this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction); } @Override public void execute( Runnable command ){ getDelegate().execute(wrapRunnable(command)); } @Override public ListenableFuture submit( Callable task ){ return wrapListenableFuture(super.submit(wrapCallable(task))); } @Override public ListenableFuture submit( Runnable task ){ return wrapListenableFuture(super.submit(wrapRunnable(task))); } @Override public ListenableFuture submit( Runnable task, T result ){ return wrapListenableFuture(super.submit(wrapRunnable(task), result)); } private Runnable wrapRunnable(final Runnable task) { return new Runnable() { @Override public void run() { deadlockDetector.set(Boolean.TRUE); try { task.run(); } finally { deadlockDetector.remove(); } } }; } private Callable wrapCallable(final Callable delagate) { return new Callable() { @Override public T call() throws Exception { deadlockDetector.set(Boolean.TRUE); try { return delagate.call(); } finally { deadlockDetector.remove(); } } }; } private ListenableFuture wrapListenableFuture(final ListenableFuture delegate ) { /* * This creates a forwarding Future that overrides calls to get(...) to check, via the * ThreadLocal, if the caller is doing a blocking call on a thread from this executor. If * so, we detect this as a deadlock and throw an ExecutionException even though it may not * be a deadlock if there are more than 1 thread in the pool. Either way, there's bad * practice somewhere, either on the client side for doing a blocking call or in the * framework's threading model. */ return new ForwardingListenableFuture.SimpleForwardingListenableFuture(delegate) { @Override public T get() throws InterruptedException, ExecutionException { checkDeadLockDetectorTL(); return super.get(); } @Override public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { checkDeadLockDetectorTL(); return super.get(timeout, unit); } void checkDeadLockDetectorTL() throws ExecutionException { if (deadlockDetector.get() != null) { throw new ExecutionException("A potential deadlock was detected.", deadlockExceptionFunction.apply(null)); } } }; } }