fa322277f732d0f108c6e45ed008ff13b6a3cb71
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / DeadlockDetectingListeningExecutorServiceTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.opendaylight.yangtools.util.concurrent.AsyncNotifyingListeningExecutorServiceTest.testListenerCallback;
15 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
16 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
17 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
18
19 import com.google.common.base.Supplier;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.ListeningExecutorService;
24 import com.google.common.util.concurrent.ThreadFactoryBuilder;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
36
37 /**
38  * Unit tests for DeadlockDetectingListeningExecutorService.
39  *
40  * @author Thomas Pantelis
41  */
42 public class DeadlockDetectingListeningExecutorServiceTest {
43
44     interface InitialInvoker {
45         void invokeExecutor( ListeningExecutorService executor, Runnable task );
46     }
47
48     static final InitialInvoker SUBMIT = ListeningExecutorService::submit;
49
50     static final InitialInvoker EXECUTE = Executor::execute;
51
52     @SuppressWarnings("serial")
53     public static class TestDeadlockException extends Exception {
54     }
55
56     private static final Supplier<Exception> DEADLOCK_EXECUTOR_SUPPLIER = TestDeadlockException::new;
57
58     DeadlockDetectingListeningExecutorService executor;
59
60     @Before
61     public void setup() {
62     }
63
64     @After
65     public void tearDown() {
66         if (executor != null ) {
67             executor.shutdownNow();
68         }
69     }
70
71     DeadlockDetectingListeningExecutorService newExecutor() {
72         return new DeadlockDetectingListeningExecutorService( Executors.newSingleThreadExecutor(),
73                 DEADLOCK_EXECUTOR_SUPPLIER );
74     }
75
76     @Test
77     public void testBlockingSubmitOffExecutor() throws Exception {
78
79         executor = newExecutor();
80
81         // Test submit with Callable.
82
83         ListenableFuture<String> future = executor.submit(() -> "foo");
84
85         assertEquals( "Future result", "foo", future.get( 5, TimeUnit.SECONDS ) );
86
87         // Test submit with Runnable.
88
89         executor.submit(() -> {
90         }).get();
91
92         // Test submit with Runnable and value.
93
94         future = executor.submit(() -> {
95         }, "foo" );
96
97         assertEquals( "Future result", "foo", future.get( 5, TimeUnit.SECONDS ) );
98     }
99
100     @Test
101     public void testNonBlockingSubmitOnExecutorThread() throws Throwable {
102
103         executor = newExecutor();
104
105         testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE );
106         testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE );
107         testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT );
108
109         testNonBlockingSubmitOnExecutorThread( EXECUTE, SUBMIT_CALLABLE );
110     }
111
112     void testNonBlockingSubmitOnExecutorThread( final InitialInvoker initialInvoker,
113             final Invoker invoker ) throws Throwable {
114
115         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
116         final CountDownLatch futureCompletedLatch = new CountDownLatch( 1 );
117
118         Runnable task = () -> Futures.addCallback( invoker.invokeExecutor( executor, null ), new FutureCallback() {
119             @Override
120             public void onSuccess( final Object result ) {
121                 futureCompletedLatch.countDown();
122             }
123
124             @Override
125             public void onFailure( final Throwable t ) {
126                 caughtEx.set( t );
127                 futureCompletedLatch.countDown();
128             }
129         } );
130
131         initialInvoker.invokeExecutor( executor, task );
132
133         assertTrue( "Task did not complete - executor likely deadlocked",
134                 futureCompletedLatch.await( 5, TimeUnit.SECONDS ) );
135
136         if (caughtEx.get() != null ) {
137             throw caughtEx.get();
138         }
139     }
140
141     @Test
142     public void testBlockingSubmitOnExecutorThread() throws Exception {
143
144         executor = newExecutor();
145
146         testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE );
147         testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE );
148         testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT );
149
150         testBlockingSubmitOnExecutorThread( EXECUTE, SUBMIT_CALLABLE );
151     }
152
153     void testBlockingSubmitOnExecutorThread( final InitialInvoker initialInvoker,
154             final Invoker invoker ) throws Exception {
155
156         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
157         final CountDownLatch latch = new CountDownLatch( 1 );
158
159         Runnable task = () -> {
160
161             try {
162                 invoker.invokeExecutor( executor, null ).get();
163             } catch( ExecutionException e ) {
164                 caughtEx.set( e.getCause() );
165             } catch( Throwable e ) {
166                 caughtEx.set( e );
167             } finally {
168                 latch.countDown();
169             }
170         };
171
172         initialInvoker.invokeExecutor( executor, task );
173
174         assertTrue( "Task did not complete - executor likely deadlocked",
175                 latch.await( 5, TimeUnit.SECONDS ) );
176
177         assertNotNull( "Expected exception thrown", caughtEx.get() );
178         assertEquals( "Caught exception type", TestDeadlockException.class, caughtEx.get().getClass() );
179     }
180
181     @Test
182     public void testListenableFutureCallbackWithExecutor() throws InterruptedException {
183
184         String listenerThreadPrefix = "ListenerThread";
185         ExecutorService listenerExecutor = Executors.newFixedThreadPool( 1,
186                 new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix + "-%d" ).build() );
187
188         executor = new DeadlockDetectingListeningExecutorService(
189                 Executors.newSingleThreadExecutor(
190                         new ThreadFactoryBuilder().setNameFormat( "SingleThread" ).build() ),
191                         DEADLOCK_EXECUTOR_SUPPLIER, listenerExecutor );
192
193         try {
194             testListenerCallback( executor, SUBMIT_CALLABLE, listenerThreadPrefix );
195             testListenerCallback( executor, SUBMIT_RUNNABLE, listenerThreadPrefix );
196             testListenerCallback( executor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
197         } finally {
198             listenerExecutor.shutdownNow();
199         }
200     }
201 }