2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.*;
13 import java.util.concurrent.Callable;
14 import java.util.concurrent.CountDownLatch;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicReference;
21 import org.junit.After;
22 import org.junit.Before;
23 import org.junit.Test;
25 import com.google.common.base.Function;
26 import com.google.common.util.concurrent.FutureCallback;
27 import com.google.common.util.concurrent.Futures;
28 import com.google.common.util.concurrent.ListenableFuture;
29 import com.google.common.util.concurrent.ListeningExecutorService;
30 import com.google.common.util.concurrent.ThreadFactoryBuilder;
32 import static org.opendaylight.yangtools.util.concurrent.AsyncNotifyingListeningExecutorServiceTest.testListenerCallback;
33 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
34 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
35 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
36 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
39 * Unit tests for DeadlockDetectingListeningExecutorService.
41 * @author Thomas Pantelis
43 public class DeadlockDetectingListeningExecutorServiceTest {
45 interface InitialInvoker {
46 void invokeExecutor( ListeningExecutorService executor, Runnable task );
49 static final InitialInvoker SUBMIT = new InitialInvoker() {
51 public void invokeExecutor( ListeningExecutorService executor, Runnable task ) {
52 executor.submit( task );
56 static final InitialInvoker EXECUTE = new InitialInvoker() {
58 public void invokeExecutor( ListeningExecutorService executor, Runnable task ) {
59 executor.execute( task );
63 @SuppressWarnings("serial")
64 public static class TestDeadlockException extends Exception {
67 public static Function<Void, Exception> DEADLOCK_EXECUTOR_FUNCTION = new Function<Void, Exception>() {
69 public Exception apply( Void notUsed ) {
70 return new TestDeadlockException();
74 DeadlockDetectingListeningExecutorService executor;
81 public void tearDown() {
82 if( executor != null ) {
83 executor.shutdownNow();
87 DeadlockDetectingListeningExecutorService newExecutor() {
88 return new DeadlockDetectingListeningExecutorService( Executors.newSingleThreadExecutor(),
89 DEADLOCK_EXECUTOR_FUNCTION );
93 public void testBlockingSubmitOffExecutor() throws Exception {
95 executor = newExecutor();
97 // Test submit with Callable.
99 ListenableFuture<String> future = executor.submit( new Callable<String>() {
101 public String call() throws Exception{
106 assertEquals( "Future result", "foo", future.get( 5, TimeUnit.SECONDS ) );
108 // Test submit with Runnable.
110 executor.submit( new Runnable() {
116 // Test submit with Runnable and value.
118 future = executor.submit( new Runnable() {
124 assertEquals( "Future result", "foo", future.get( 5, TimeUnit.SECONDS ) );
128 public void testNonBlockingSubmitOnExecutorThread() throws Throwable {
130 executor = newExecutor();
132 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE );
133 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE );
134 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT );
136 testNonBlockingSubmitOnExecutorThread( EXECUTE, SUBMIT_CALLABLE );
139 void testNonBlockingSubmitOnExecutorThread( InitialInvoker initialInvoker,
140 final Invoker invoker ) throws Throwable {
142 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
143 final CountDownLatch futureCompletedLatch = new CountDownLatch( 1 );
145 Runnable task = new Runnable() {
146 @SuppressWarnings({ "unchecked", "rawtypes" })
150 Futures.addCallback( invoker.invokeExecutor( executor, null ), new FutureCallback() {
152 public void onSuccess( Object result ) {
153 futureCompletedLatch.countDown();
157 public void onFailure( Throwable t ) {
159 futureCompletedLatch.countDown();
166 initialInvoker.invokeExecutor( executor, task );
168 assertTrue( "Task did not complete - executor likely deadlocked",
169 futureCompletedLatch.await( 5, TimeUnit.SECONDS ) );
171 if( caughtEx.get() != null ) {
172 throw caughtEx.get();
177 public void testBlockingSubmitOnExecutorThread() throws Exception {
179 executor = newExecutor();
181 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE );
182 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE );
183 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT );
185 testBlockingSubmitOnExecutorThread( EXECUTE, SUBMIT_CALLABLE );
188 void testBlockingSubmitOnExecutorThread( InitialInvoker initialInvoker,
189 final Invoker invoker ) throws Exception {
191 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
192 final CountDownLatch latch = new CountDownLatch( 1 );
194 Runnable task = new Runnable() {
199 invoker.invokeExecutor( executor, null ).get();
200 } catch( ExecutionException e ) {
201 caughtEx.set( e.getCause() );
202 } catch( Throwable e ) {
211 initialInvoker.invokeExecutor( executor, task );
213 assertTrue( "Task did not complete - executor likely deadlocked",
214 latch.await( 5, TimeUnit.SECONDS ) );
216 assertNotNull( "Expected exception thrown", caughtEx.get() );
217 assertEquals( "Caught exception type", TestDeadlockException.class, caughtEx.get().getClass() );
221 public void testListenableFutureCallbackWithExecutor() throws InterruptedException {
223 String listenerThreadPrefix = "ListenerThread";
224 ExecutorService listenerExecutor = Executors.newFixedThreadPool( 1,
225 new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix + "-%d" ).build() );
227 executor = new DeadlockDetectingListeningExecutorService(
228 Executors.newSingleThreadExecutor(
229 new ThreadFactoryBuilder().setNameFormat( "SingleThread" ).build() ),
230 DEADLOCK_EXECUTOR_FUNCTION, listenerExecutor );
233 testListenerCallback( executor, SUBMIT_CALLABLE, listenerThreadPrefix );
234 testListenerCallback( executor, SUBMIT_RUNNABLE, listenerThreadPrefix );
235 testListenerCallback( executor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
237 listenerExecutor.shutdownNow();