Propagate @Nonnull and @Nullable annotations
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / DeadlockDetectingListeningExecutorServiceTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.opendaylight.yangtools.util.concurrent.AsyncNotifyingListeningExecutorServiceTest.testListenerCallback;
15 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
16 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
17 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
18
19 import com.google.common.base.Supplier;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.ListeningExecutorService;
24 import com.google.common.util.concurrent.ThreadFactoryBuilder;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import javax.annotation.Nonnull;
33 import org.junit.After;
34 import org.junit.Before;
35 import org.junit.Test;
36 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
37
38 /**
39  * Unit tests for DeadlockDetectingListeningExecutorService.
40  *
41  * @author Thomas Pantelis
42  */
43 public class DeadlockDetectingListeningExecutorServiceTest {
44
45     interface InitialInvoker {
46         void invokeExecutor(ListeningExecutorService executor, Runnable task);
47     }
48
49     static final InitialInvoker SUBMIT = ListeningExecutorService::submit;
50
51     static final InitialInvoker EXECUTE = Executor::execute;
52
53     public static class TestDeadlockException extends Exception {
54         private static final long serialVersionUID = 1L;
55
56     }
57
58     private static final Supplier<Exception> DEADLOCK_EXECUTOR_SUPPLIER = TestDeadlockException::new;
59
60     DeadlockDetectingListeningExecutorService executor;
61
62     @Before
63     public void setup() {
64     }
65
66     @After
67     public void tearDown() {
68         if (executor != null) {
69             executor.shutdownNow();
70         }
71     }
72
73     DeadlockDetectingListeningExecutorService newExecutor() {
74         return new DeadlockDetectingListeningExecutorService(Executors.newSingleThreadExecutor(),
75                 DEADLOCK_EXECUTOR_SUPPLIER);
76     }
77
78     @Test
79     public void testBlockingSubmitOffExecutor() throws Exception {
80
81         executor = newExecutor();
82
83         // Test submit with Callable.
84
85         ListenableFuture<String> future = executor.submit(() -> "foo");
86
87         assertEquals( "Future result", "foo", future.get( 5, TimeUnit.SECONDS));
88
89         // Test submit with Runnable.
90
91         executor.submit(() -> {
92         }).get();
93
94         // Test submit with Runnable and value.
95
96         future = executor.submit(() -> { }, "foo");
97
98         assertEquals("Future result", "foo", future.get(5, TimeUnit.SECONDS));
99     }
100
101     @Test
102     public void testNonBlockingSubmitOnExecutorThread() throws Throwable {
103
104         executor = newExecutor();
105
106         testNonBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_CALLABLE);
107         testNonBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE);
108         testNonBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT);
109
110         testNonBlockingSubmitOnExecutorThread(EXECUTE, SUBMIT_CALLABLE);
111     }
112
113     void testNonBlockingSubmitOnExecutorThread(final InitialInvoker initialInvoker, final Invoker invoker)
114             throws Throwable {
115
116         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
117         final CountDownLatch futureCompletedLatch = new CountDownLatch(1);
118
119         Runnable task = () -> Futures.addCallback(invoker.invokeExecutor(executor, null), new FutureCallback<Object>() {
120             @Override
121             public void onSuccess(final Object result) {
122                 futureCompletedLatch.countDown();
123             }
124
125             @Override
126             public void onFailure(@Nonnull final Throwable t) {
127                 caughtEx.set(t);
128                 futureCompletedLatch.countDown();
129             }
130         });
131
132         initialInvoker.invokeExecutor(executor, task);
133
134         assertTrue("Task did not complete - executor likely deadlocked",
135                 futureCompletedLatch.await(5, TimeUnit.SECONDS));
136
137         if (caughtEx.get() != null) {
138             throw caughtEx.get();
139         }
140     }
141
142     @Test
143     public void testBlockingSubmitOnExecutorThread() throws Exception {
144
145         executor = newExecutor();
146
147         testBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_CALLABLE);
148         testBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE);
149         testBlockingSubmitOnExecutorThread(SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT);
150
151         testBlockingSubmitOnExecutorThread(EXECUTE, SUBMIT_CALLABLE);
152     }
153
154     void testBlockingSubmitOnExecutorThread(final InitialInvoker initialInvoker, final Invoker invoker)
155             throws Exception {
156
157         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
158         final CountDownLatch latch = new CountDownLatch(1);
159
160         Runnable task = () -> {
161
162             try {
163                 invoker.invokeExecutor(executor, null).get();
164             } catch(ExecutionException e) {
165                 caughtEx.set(e.getCause());
166             } catch(Throwable e) {
167                 caughtEx.set(e);
168             } finally {
169                 latch.countDown();
170             }
171         };
172
173         initialInvoker.invokeExecutor(executor, task);
174
175         assertTrue("Task did not complete - executor likely deadlocked", latch.await( 5, TimeUnit.SECONDS));
176         assertNotNull("Expected exception thrown", caughtEx.get());
177         assertEquals("Caught exception type", TestDeadlockException.class, caughtEx.get().getClass());
178     }
179
180     @Test
181     public void testListenableFutureCallbackWithExecutor() throws InterruptedException {
182
183         String listenerThreadPrefix = "ListenerThread";
184         ExecutorService listenerExecutor = Executors.newFixedThreadPool(1,
185                 new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix + "-%d").build());
186
187         executor = new DeadlockDetectingListeningExecutorService(
188             Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("SingleThread").build()),
189                 DEADLOCK_EXECUTOR_SUPPLIER, listenerExecutor);
190
191         try {
192             testListenerCallback(executor, SUBMIT_CALLABLE, listenerThreadPrefix);
193             testListenerCallback(executor, SUBMIT_RUNNABLE, listenerThreadPrefix);
194             testListenerCallback(executor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
195         } finally {
196             listenerExecutor.shutdownNow();
197         }
198     }
199 }