2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.opendaylight.yangtools.util.concurrent.AsyncNotifyingListeningExecutorServiceTest.testListenerCallback;
15 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
16 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
17 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
19 import com.google.common.base.Supplier;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.ListeningExecutorService;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import javax.annotation.Nonnull;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
40 * Unit tests for DeadlockDetectingListeningExecutorService.
42 * @author Thomas Pantelis
44 public class DeadlockDetectingListeningExecutorServiceTest {
46 interface InitialInvoker {
47 void invokeExecutor(ListeningExecutorService executor, Runnable task);
50 static final InitialInvoker SUBMIT = ListeningExecutorService::submit;
52 static final InitialInvoker EXECUTE = Executor::execute;
54 public static class TestDeadlockException extends Exception {
55 private static final long serialVersionUID = 1L;
59 private static final Supplier<Exception> DEADLOCK_EXECUTOR_SUPPLIER = TestDeadlockException::new;
61 DeadlockDetectingListeningExecutorService executor;
68 public void tearDown() {
69 if (executor != null) {
70 executor.shutdownNow();
74 DeadlockDetectingListeningExecutorService newExecutor() {
75 return new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
76 DEADLOCK_EXECUTOR_SUPPLIER);
80 public void testBlockingSubmitOffExecutor() throws Exception {
82 executor = newExecutor();
84 // Test submit with Callable.
86 ListenableFuture<String> future = executor.submit(() -> "foo");
88 assertEquals("Future result", "foo", future.get(5, TimeUnit.SECONDS));
90 // Test submit with Runnable.
92 executor.submit(() -> { }).get();
94 // Test submit with Runnable and value.
96 future = executor.submit(() -> { }, "foo");
98 assertEquals("Future result", "foo", future.get(5, TimeUnit.SECONDS));
102 @SuppressWarnings("checkstyle:illegalThrows")
103 public void testNonBlockingSubmitOnExecutorThread() throws Throwable {
105 executor = newExecutor();
107 testNonBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_CALLABLE);
108 testNonBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE);
109 testNonBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT);
111 testNonBlockingSubmitOnExecutorThread(EXECUTE, SUBMIT_CALLABLE);
114 @SuppressWarnings("checkstyle:illegalThrows")
115 void testNonBlockingSubmitOnExecutorThread(final InitialInvoker initialInvoker, final Invoker invoker)
118 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
119 final CountDownLatch futureCompletedLatch = new CountDownLatch(1);
121 Runnable task = () -> Futures.addCallback(invoker.invokeExecutor(executor, null), new FutureCallback<Object>() {
123 public void onSuccess(final Object result) {
124 futureCompletedLatch.countDown();
128 @SuppressWarnings("checkstyle:parameterName")
129 public void onFailure(@Nonnull final Throwable t) {
131 futureCompletedLatch.countDown();
133 }, MoreExecutors.directExecutor());
135 initialInvoker.invokeExecutor(executor, task);
137 assertTrue("Task did not complete - executor likely deadlocked",
138 futureCompletedLatch.await(5, TimeUnit.SECONDS));
140 if (caughtEx.get() != null) {
141 throw caughtEx.get();
146 public void testBlockingSubmitOnExecutorThread() throws InterruptedException {
148 executor = newExecutor();
150 testBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_CALLABLE);
151 testBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE);
152 testBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT);
154 testBlockingSubmitOnExecutorThread(EXECUTE, SUBMIT_CALLABLE);
157 @SuppressWarnings("checkstyle:illegalCatch")
158 void testBlockingSubmitOnExecutorThread(final InitialInvoker initialInvoker, final Invoker invoker)
159 throws InterruptedException {
161 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
162 final CountDownLatch latch = new CountDownLatch(1);
164 Runnable task = () -> {
167 invoker.invokeExecutor(executor, null).get();
168 } catch (ExecutionException e) {
169 caughtEx.set(e.getCause());
170 } catch (Throwable e) {
177 initialInvoker.invokeExecutor(executor, task);
179 assertTrue("Task did not complete - executor likely deadlocked", latch.await(5, TimeUnit.SECONDS));
180 assertNotNull("Expected exception thrown", caughtEx.get());
181 assertEquals("Caught exception type", TestDeadlockException.class, caughtEx.get().getClass());
185 public void testListenableFutureCallbackWithExecutor() throws InterruptedException {
187 String listenerThreadPrefix = "ListenerThread";
188 ExecutorService listenerExecutor = Executors.newFixedThreadPool(1,
189 new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix + "-%d").build());
191 executor = new DeadlockDetectingListeningExecutorService(
192 Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("SingleThread").build()),
193 DEADLOCK_EXECUTOR_SUPPLIER, listenerExecutor);
196 testListenerCallback(executor, SUBMIT_CALLABLE, listenerThreadPrefix);
197 testListenerCallback(executor, SUBMIT_RUNNABLE, listenerThreadPrefix);
198 testListenerCallback(executor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
200 listenerExecutor.shutdownNow();