Merge branch 'master' of ../controller
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / DeadlockDetectingListeningExecutorServiceTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.opendaylight.yangtools.util.concurrent.AsyncNotifyingListeningExecutorServiceTest.testListenerCallback;
14 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
15 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
16 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
17
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.ListeningExecutorService;
22 import com.google.common.util.concurrent.MoreExecutors;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicReference;
31 import java.util.function.Supplier;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
36
37 /**
38  * Unit tests for DeadlockDetectingListeningExecutorService.
39  *
40  * @author Thomas Pantelis
41  */
42 public class DeadlockDetectingListeningExecutorServiceTest {
43
44     interface InitialInvoker {
45         void invokeExecutor(ListeningExecutorService executor, Runnable task);
46     }
47
48     static final InitialInvoker SUBMIT = ListeningExecutorService::submit;
49
50     static final InitialInvoker EXECUTE = Executor::execute;
51
52     public static class TestDeadlockException extends Exception {
53         private static final long serialVersionUID = 1L;
54
55     }
56
57     private static final Supplier<Exception> DEADLOCK_EXECUTOR_SUPPLIER = TestDeadlockException::new;
58
59     DeadlockDetectingListeningExecutorService executor;
60
61     @Before
62     public void setup() {
63     }
64
65     @After
66     public void tearDown() {
67         if (executor != null) {
68             executor.shutdownNow();
69         }
70     }
71
72     DeadlockDetectingListeningExecutorService newExecutor() {
73         return new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
74                 DEADLOCK_EXECUTOR_SUPPLIER);
75     }
76
77     @Test
78     public void testBlockingSubmitOffExecutor() throws Exception {
79
80         executor = newExecutor();
81
82         // Test submit with Callable.
83
84         ListenableFuture<String> future = executor.submit(() -> "foo");
85
86         assertEquals("Future result", "foo", future.get(5, TimeUnit.SECONDS));
87
88         // Test submit with Runnable.
89
90         executor.submit(() -> { }).get();
91
92         // Test submit with Runnable and value.
93
94         future = executor.submit(() -> { }, "foo");
95
96         assertEquals("Future result", "foo", future.get(5, TimeUnit.SECONDS));
97     }
98
99     @Test
100     @SuppressWarnings("checkstyle:illegalThrows")
101     public void testNonBlockingSubmitOnExecutorThread() throws Throwable {
102
103         executor = newExecutor();
104
105         testNonBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_CALLABLE);
106         testNonBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE);
107         testNonBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT);
108
109         testNonBlockingSubmitOnExecutorThread(EXECUTE, SUBMIT_CALLABLE);
110     }
111
112     @SuppressWarnings("checkstyle:illegalThrows")
113     void testNonBlockingSubmitOnExecutorThread(final InitialInvoker initialInvoker, final Invoker invoker)
114             throws Throwable {
115
116         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
117         final CountDownLatch futureCompletedLatch = new CountDownLatch(1);
118
119         Runnable task = () -> Futures.addCallback(invoker.invokeExecutor(executor, null), new FutureCallback<Object>() {
120             @Override
121             public void onSuccess(final Object result) {
122                 futureCompletedLatch.countDown();
123             }
124
125             @Override
126             @SuppressWarnings("checkstyle:parameterName")
127             public void onFailure(final Throwable t) {
128                 caughtEx.set(t);
129                 futureCompletedLatch.countDown();
130             }
131         }, MoreExecutors.directExecutor());
132
133         initialInvoker.invokeExecutor(executor, task);
134
135         assertTrue("Task did not complete - executor likely deadlocked",
136                 futureCompletedLatch.await(5, TimeUnit.SECONDS));
137
138         if (caughtEx.get() != null) {
139             throw caughtEx.get();
140         }
141     }
142
143     @Test
144     public void testBlockingSubmitOnExecutorThread() throws InterruptedException {
145
146         executor = newExecutor();
147
148         testBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_CALLABLE);
149         testBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE);
150         testBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT);
151
152         testBlockingSubmitOnExecutorThread(EXECUTE, SUBMIT_CALLABLE);
153     }
154
155     @SuppressWarnings("checkstyle:illegalCatch")
156     void testBlockingSubmitOnExecutorThread(final InitialInvoker initialInvoker, final Invoker invoker)
157             throws InterruptedException {
158
159         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
160         final CountDownLatch latch = new CountDownLatch(1);
161
162         Runnable task = () -> {
163
164             try {
165                 invoker.invokeExecutor(executor, null).get();
166             } catch (ExecutionException e) {
167                 caughtEx.set(e.getCause());
168             } catch (Throwable e) {
169                 caughtEx.set(e);
170             } finally {
171                 latch.countDown();
172             }
173         };
174
175         initialInvoker.invokeExecutor(executor, task);
176
177         assertTrue("Task did not complete - executor likely deadlocked", latch.await(5, TimeUnit.SECONDS));
178         assertNotNull("Expected exception thrown", caughtEx.get());
179         assertEquals("Caught exception type", TestDeadlockException.class, caughtEx.get().getClass());
180     }
181
182     @Test
183     public void testListenableFutureCallbackWithExecutor() throws InterruptedException {
184
185         String listenerThreadPrefix = "ListenerThread";
186         ExecutorService listenerExecutor = Executors.newFixedThreadPool(1,
187                 new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix + "-%d").build());
188
189         executor = new DeadlockDetectingListeningExecutorService(
190             Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("SingleThread").build()),
191                 DEADLOCK_EXECUTOR_SUPPLIER, listenerExecutor);
192
193         try {
194             testListenerCallback(executor, SUBMIT_CALLABLE, listenerThreadPrefix);
195             testListenerCallback(executor, SUBMIT_RUNNABLE, listenerThreadPrefix);
196             testListenerCallback(executor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
197         } finally {
198             listenerExecutor.shutdownNow();
199         }
200     }
201 }