/* * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.yangtools.util.concurrent; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.Function; import com.google.common.util.concurrent.AbstractListeningExecutorService; import com.google.common.util.concurrent.ForwardingListenableFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that * could occur if clients invoke the returned Future's get methods synchronously. *

* Deadlock scenarios are most apt to occur with a backing single-threaded executor where setting of * the Future's result is executed on the single thread. Here's a scenario: *

* The second submitted task will never execute since the single thread is currently executing * the client code which is blocked waiting for the submitted task to complete. Thus, deadlock has * occurred. *

* This class prevents this scenario via the use of a ThreadLocal variable. When a task is invoked, * the ThreadLocal is set and, when a task completes, the ThreadLocal is cleared. Futures returned * from this class override the get methods to check if the ThreadLocal is set. If it is, * an ExecutionException is thrown with a custom cause. * * @author Thomas Pantelis */ public class DeadlockDetectingListeningExecutorService extends AbstractListeningExecutorService { private final ThreadLocal deadlockDetector = new ThreadLocal<>(); private final Function deadlockExceptionFunction; private final ExecutorService delegate; /** * Constructor. * * @param delegate the backing ExecutorService. * @param deadlockExceptionFunction Function that returns an Exception instance to set as the * cause of the ExecutionException when a deadlock is detected. */ public DeadlockDetectingListeningExecutorService(final ExecutorService delegate, final Function deadlockExceptionFunction) { this.delegate = checkNotNull(delegate); this.deadlockExceptionFunction = checkNotNull(deadlockExceptionFunction); } @Override public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { return delegate.awaitTermination(timeout, unit); } @Override public boolean isShutdown() { return delegate.isShutdown(); } @Override public boolean isTerminated() { return delegate.isTerminated(); } @Override public void shutdown() { delegate.shutdown(); } @Override public List shutdownNow() { return delegate.shutdownNow(); } @Override public void execute(final Runnable command) { delegate.execute(wrapRunnable(command)); } @Override public ListenableFuture submit(final Callable task ) { final ListenableFutureTask futureTask = ListenableFutureTask.create(wrapCallable(task)); delegate.execute(futureTask); return wrapListenableFuture(futureTask); } @Override public ListenableFuture submit( final Runnable task ) { ListenableFutureTask futureTask = ListenableFutureTask.create(wrapRunnable(task), null); delegate.execute(futureTask); return wrapListenableFuture(futureTask); } @Override public ListenableFuture submit(final Runnable task, final T result) { ListenableFutureTask futureTask = ListenableFutureTask.create(wrapRunnable(task), result); delegate.execute(futureTask); return wrapListenableFuture(futureTask); } private Runnable wrapRunnable(final Runnable task) { return new Runnable() { @Override public void run() { deadlockDetector.set(Boolean.TRUE); try { task.run(); } finally { deadlockDetector.set(null); } } }; } private Callable wrapCallable(final Callable delagate) { return new Callable() { @Override public T call() throws Exception { deadlockDetector.set(Boolean.TRUE); try { return delagate.call(); } finally { deadlockDetector.set(null); } } }; } private ListenableFuture wrapListenableFuture(final ListenableFuture delegate ) { /* * This creates a forwarding Future that overrides calls to get(...) to check, via the ThreadLocal, * if the caller is doing a blocking call on a thread from this executor. If so, we detect this as * a deadlock and throw an ExecutionException even though it may not be a deadlock if there are * more than 1 thread in the pool. Either way, there's bad practice somewhere, either on the client * side for doing a blocking call or in the framework's threading model. */ return new ForwardingListenableFuture.SimpleForwardingListenableFuture(delegate) { @Override public T get() throws InterruptedException, ExecutionException { checkDeadLockDetectorTL(); return super.get(); } @Override public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { checkDeadLockDetectorTL(); return super.get(timeout, unit); } void checkDeadLockDetectorTL() throws ExecutionException { if (deadlockDetector.get() != null) { throw new ExecutionException("A potential deadlock was detected.", deadlockExceptionFunction.apply(null)); } } }; } }