X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=common%2Futil%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fyangtools%2Futil%2Fconcurrent%2FDeadlockDetectingListeningExecutorService.java;h=de4a91e5a80605faa49b2d8fbb36234f38a36754;hb=0df9e95c2849172793642a654ef1061811d40f30;hp=fc17d890f5ef51b5e5e889f57ae0e56a96c5d6f4;hpb=56c517b8d3d33bb45f5b75339140c8ea23d15add;p=yangtools.git diff --git a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java index fc17d890f5..de4a91e5a8 100644 --- a/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java +++ b/common/util/src/main/java/org/opendaylight/yangtools/util/concurrent/DeadlockDetectingListeningExecutorService.java @@ -5,13 +5,12 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.yangtools.util.concurrent; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; -import com.google.common.util.concurrent.ForwardingListenableFuture; +import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture; import com.google.common.util.concurrent.ListenableFuture; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -20,32 +19,30 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; /** - * An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that - * could occur if clients invoke the returned Future's get methods synchronously. + * An implementation of ListeningExecutorService that attempts to detect deadlock scenarios that could occur if clients + * invoke the returned Future's get methods synchronously. * - *

Deadlock scenarios are most apt to occur with a backing single-threaded executor where setting of - * the Future's result is executed on the single thread. Here's a scenario: + *

Deadlock scenarios are most apt to occur with a backing single-threaded executor where setting of the Future's + * result is executed on the single thread. Here's a scenario: *

- * The second submitted task will never execute since the single thread is currently executing - * the client code which is blocked waiting for the submitted task to complete. Thus, deadlock has - * occurred. + * The second submitted task will never execute since the single thread is currently executing the client code which + * is blocked waiting for the submitted task to complete. Thus, deadlock has occurred. * - *

This class prevents this scenario via the use of a ThreadLocal variable. When a task is invoked, - * the ThreadLocal is set and, when a task completes, the ThreadLocal is cleared. Futures returned - * from this class override the get methods to check if the ThreadLocal is set. If it is, - * an ExecutionException is thrown with a custom cause. + *

This class prevents this scenario via the use of a ThreadLocal variable. When a task is invoked, the ThreadLocal + * is set and, when a task completes, the ThreadLocal is cleared. Futures returned from this class override + * the {@code get} methods to check if the ThreadLocal is set. If it is, an ExecutionException is thrown with a custom + * cause. * - *

Note that the ThreadLocal is not removed automatically, so some state may be left hanging off of - * threads which have encountered this class. If you need to clean that state up, use - * {@link #cleanStateForCurrentThread()}. + *

Note that the ThreadLocal is not removed automatically, so some state may be left hanging off of threads which + * have encountered this class. If you need to clean that state up, use {@link #cleanStateForCurrentThread()}. * * @author Thomas Pantelis * @author Robert Varga @@ -56,8 +53,8 @@ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingLis * tasks may be submitted to underlay and some to overlay service -- and the two cases need to * be discerned reliably. */ - private final SettableBooleanThreadLocal deadlockDetector = new SettableBooleanThreadLocal(); - private final Supplier deadlockExceptionFunction; + private final @NonNull SettableBooleanThreadLocal deadlockDetector = new SettableBooleanThreadLocal(); + private final @NonNull Supplier deadlockExceptionFunction; /** * Constructor. @@ -66,8 +63,8 @@ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingLis * @param deadlockExceptionSupplier Supplier that returns an Exception instance to set as the * cause of the ExecutionException when a deadlock is detected. */ - public DeadlockDetectingListeningExecutorService(final ExecutorService delegate, - @Nonnull final Supplier deadlockExceptionSupplier) { + public DeadlockDetectingListeningExecutorService(final @NonNull ExecutorService delegate, + final @NonNull Supplier deadlockExceptionSupplier) { this(delegate, deadlockExceptionSupplier, null); } @@ -80,31 +77,28 @@ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingLis * @param listenableFutureExecutor the executor used to run listener callbacks asynchronously. * If null, no executor is used. */ - public DeadlockDetectingListeningExecutorService(final ExecutorService delegate, - @Nonnull final Supplier deadlockExceptionSupplier, + public DeadlockDetectingListeningExecutorService(final @NonNull ExecutorService delegate, + @NonNull final Supplier deadlockExceptionSupplier, @Nullable final Executor listenableFutureExecutor) { super(delegate, listenableFutureExecutor); this.deadlockExceptionFunction = requireNonNull(deadlockExceptionSupplier); } @Override - public void execute(@Nonnull final Runnable command) { + public void execute(final Runnable command) { getDelegate().execute(wrapRunnable(command)); } - @Nonnull @Override public ListenableFuture submit(final Callable task) { return wrapListenableFuture(super.submit(wrapCallable(task))); } - @Nonnull @Override public ListenableFuture submit(final Runnable task) { return wrapListenableFuture(super.submit(wrapRunnable(task))); } - @Nonnull @Override public ListenableFuture submit(final Runnable task, final T result) { return wrapListenableFuture(super.submit(wrapRunnable(task), result)); @@ -118,14 +112,15 @@ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingLis deadlockDetector.remove(); } - private SettableBoolean primeDetector() { + private @NonNull SettableBoolean primeDetector() { final SettableBoolean b = deadlockDetector.get(); checkState(!b.isSet(), "Detector for {} has already been primed", this); b.set(); return b; } - private Runnable wrapRunnable(final Runnable task) { + private @NonNull Runnable wrapRunnable(final @Nullable Runnable task) { + requireNonNull(task); return () -> { final SettableBoolean b = primeDetector(); try { @@ -136,18 +131,19 @@ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingLis }; } - private Callable wrapCallable(final Callable delagate) { + private @NonNull Callable wrapCallable(final @NonNull Callable task) { + requireNonNull(task); return () -> { final SettableBoolean b = primeDetector(); try { - return delagate.call(); + return task.call(); } finally { b.reset(); } }; } - private ListenableFuture wrapListenableFuture(final ListenableFuture delegate) { + private @NonNull ListenableFuture wrapListenableFuture(final @NonNull ListenableFuture delegate) { /* * This creates a forwarding Future that overrides calls to get(...) to check, via the * ThreadLocal, if the caller is doing a blocking call on a thread from this executor. If @@ -156,7 +152,7 @@ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingLis * practice somewhere, either on the client side for doing a blocking call or in the * framework's threading model. */ - return new ForwardingListenableFuture.SimpleForwardingListenableFuture(delegate) { + return new SimpleForwardingListenableFuture(delegate) { @Override public T get() throws InterruptedException, ExecutionException { checkDeadLockDetectorTL(); @@ -164,8 +160,8 @@ public class DeadlockDetectingListeningExecutorService extends AsyncNotifyingLis } @Override - public T get(final long timeout, @Nonnull final TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, + TimeoutException { checkDeadLockDetectorTL(); return super.get(timeout, unit); }