2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.opendaylight.yangtools.util.concurrent.AsyncNotifyingListeningExecutorServiceTest.testListenerCallback;
15 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
16 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
17 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
19 import com.google.common.base.Function;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.ListeningExecutorService;
24 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
40 * Unit tests for DeadlockDetectingListeningExecutorService.
42 * @author Thomas Pantelis
44 public class DeadlockDetectingListeningExecutorServiceTest {
46 interface InitialInvoker {
47 void invokeExecutor( ListeningExecutorService executor, Runnable task );
50 static final InitialInvoker SUBMIT = new InitialInvoker() {
52 public void invokeExecutor( final ListeningExecutorService executor, final Runnable task ) {
53 executor.submit( task );
57 static final InitialInvoker EXECUTE = new InitialInvoker() {
59 public void invokeExecutor( final ListeningExecutorService executor, final Runnable task ) {
60 executor.execute( task );
64 @SuppressWarnings("serial")
65 public static class TestDeadlockException extends Exception {
68 public static Function<Void, Exception> DEADLOCK_EXECUTOR_FUNCTION = new Function<Void, Exception>() {
70 public Exception apply( final Void notUsed ) {
71 return new TestDeadlockException();
75 DeadlockDetectingListeningExecutorService executor;
82 public void tearDown() {
83 if( executor != null ) {
84 executor.shutdownNow();
88 DeadlockDetectingListeningExecutorService newExecutor() {
89 return new DeadlockDetectingListeningExecutorService( Executors.newSingleThreadExecutor(),
90 DEADLOCK_EXECUTOR_FUNCTION );
94 public void testBlockingSubmitOffExecutor() throws Exception {
96 executor = newExecutor();
98 // Test submit with Callable.
100 ListenableFuture<String> future = executor.submit( new Callable<String>() {
102 public String call() throws Exception{
107 assertEquals( "Future result", "foo", future.get( 5, TimeUnit.SECONDS ) );
109 // Test submit with Runnable.
111 executor.submit( new Runnable() {
117 // Test submit with Runnable and value.
119 future = executor.submit( new Runnable() {
125 assertEquals( "Future result", "foo", future.get( 5, TimeUnit.SECONDS ) );
129 public void testNonBlockingSubmitOnExecutorThread() throws Throwable {
131 executor = newExecutor();
133 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE );
134 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE );
135 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT );
137 testNonBlockingSubmitOnExecutorThread( EXECUTE, SUBMIT_CALLABLE );
140 void testNonBlockingSubmitOnExecutorThread( final InitialInvoker initialInvoker,
141 final Invoker invoker ) throws Throwable {
143 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
144 final CountDownLatch futureCompletedLatch = new CountDownLatch( 1 );
146 Runnable task = new Runnable() {
147 @SuppressWarnings({ "unchecked", "rawtypes" })
151 Futures.addCallback( invoker.invokeExecutor( executor, null ), new FutureCallback() {
153 public void onSuccess( final Object result ) {
154 futureCompletedLatch.countDown();
158 public void onFailure( final Throwable t ) {
160 futureCompletedLatch.countDown();
167 initialInvoker.invokeExecutor( executor, task );
169 assertTrue( "Task did not complete - executor likely deadlocked",
170 futureCompletedLatch.await( 5, TimeUnit.SECONDS ) );
172 if( caughtEx.get() != null ) {
173 throw caughtEx.get();
178 public void testBlockingSubmitOnExecutorThread() throws Exception {
180 executor = newExecutor();
182 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE );
183 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE );
184 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT );
186 testBlockingSubmitOnExecutorThread( EXECUTE, SUBMIT_CALLABLE );
189 void testBlockingSubmitOnExecutorThread( final InitialInvoker initialInvoker,
190 final Invoker invoker ) throws Exception {
192 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
193 final CountDownLatch latch = new CountDownLatch( 1 );
195 Runnable task = new Runnable() {
200 invoker.invokeExecutor( executor, null ).get();
201 } catch( ExecutionException e ) {
202 caughtEx.set( e.getCause() );
203 } catch( Throwable e ) {
212 initialInvoker.invokeExecutor( executor, task );
214 assertTrue( "Task did not complete - executor likely deadlocked",
215 latch.await( 5, TimeUnit.SECONDS ) );
217 assertNotNull( "Expected exception thrown", caughtEx.get() );
218 assertEquals( "Caught exception type", TestDeadlockException.class, caughtEx.get().getClass() );
222 public void testListenableFutureCallbackWithExecutor() throws InterruptedException {
224 String listenerThreadPrefix = "ListenerThread";
225 ExecutorService listenerExecutor = Executors.newFixedThreadPool( 1,
226 new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix + "-%d" ).build() );
228 executor = new DeadlockDetectingListeningExecutorService(
229 Executors.newSingleThreadExecutor(
230 new ThreadFactoryBuilder().setNameFormat( "SingleThread" ).build() ),
231 DEADLOCK_EXECUTOR_FUNCTION, listenerExecutor );
234 testListenerCallback( executor, SUBMIT_CALLABLE, listenerThreadPrefix );
235 testListenerCallback( executor, SUBMIT_RUNNABLE, listenerThreadPrefix );
236 testListenerCallback( executor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
238 listenerExecutor.shutdownNow();