2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.opendaylight.yangtools.util.concurrent.AsyncNotifyingListeningExecutorServiceTest.testListenerCallback;
15 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
16 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
17 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
18 import com.google.common.base.Supplier;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.ListenableFuture;
22 import com.google.common.util.concurrent.ListeningExecutorService;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import java.util.concurrent.Callable;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.junit.After;
32 import org.junit.Before;
33 import org.junit.Test;
34 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
37 * Unit tests for DeadlockDetectingListeningExecutorService.
39 * @author Thomas Pantelis
41 public class DeadlockDetectingListeningExecutorServiceTest {
43 interface InitialInvoker {
44 void invokeExecutor( ListeningExecutorService executor, Runnable task );
47 static final InitialInvoker SUBMIT = new InitialInvoker() {
49 public void invokeExecutor( final ListeningExecutorService executor, final Runnable task ) {
50 executor.submit( task );
54 static final InitialInvoker EXECUTE = new InitialInvoker() {
56 public void invokeExecutor( final ListeningExecutorService executor, final Runnable task ) {
57 executor.execute( task );
61 @SuppressWarnings("serial")
62 public static class TestDeadlockException extends Exception {
65 private static final Supplier<Exception> DEADLOCK_EXECUTOR_SUPPLIER = new Supplier<Exception>() {
67 public Exception get() {
68 return new TestDeadlockException();
72 DeadlockDetectingListeningExecutorService executor;
79 public void tearDown() {
80 if( executor != null ) {
81 executor.shutdownNow();
85 DeadlockDetectingListeningExecutorService newExecutor() {
86 return new DeadlockDetectingListeningExecutorService( Executors.newSingleThreadExecutor(),
87 DEADLOCK_EXECUTOR_SUPPLIER );
91 public void testBlockingSubmitOffExecutor() throws Exception {
93 executor = newExecutor();
95 // Test submit with Callable.
97 ListenableFuture<String> future = executor.submit( new Callable<String>() {
99 public String call() throws Exception{
104 assertEquals( "Future result", "foo", future.get( 5, TimeUnit.SECONDS ) );
106 // Test submit with Runnable.
108 executor.submit( new Runnable() {
114 // Test submit with Runnable and value.
116 future = executor.submit( new Runnable() {
122 assertEquals( "Future result", "foo", future.get( 5, TimeUnit.SECONDS ) );
126 public void testNonBlockingSubmitOnExecutorThread() throws Throwable {
128 executor = newExecutor();
130 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE );
131 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE );
132 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT );
134 testNonBlockingSubmitOnExecutorThread( EXECUTE, SUBMIT_CALLABLE );
137 void testNonBlockingSubmitOnExecutorThread( final InitialInvoker initialInvoker,
138 final Invoker invoker ) throws Throwable {
140 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
141 final CountDownLatch futureCompletedLatch = new CountDownLatch( 1 );
143 Runnable task = new Runnable() {
144 @SuppressWarnings({ "unchecked", "rawtypes" })
148 Futures.addCallback( invoker.invokeExecutor( executor, null ), new FutureCallback() {
150 public void onSuccess( final Object result ) {
151 futureCompletedLatch.countDown();
155 public void onFailure( final Throwable t ) {
157 futureCompletedLatch.countDown();
164 initialInvoker.invokeExecutor( executor, task );
166 assertTrue( "Task did not complete - executor likely deadlocked",
167 futureCompletedLatch.await( 5, TimeUnit.SECONDS ) );
169 if( caughtEx.get() != null ) {
170 throw caughtEx.get();
175 public void testBlockingSubmitOnExecutorThread() throws Exception {
177 executor = newExecutor();
179 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE );
180 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE );
181 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT );
183 testBlockingSubmitOnExecutorThread( EXECUTE, SUBMIT_CALLABLE );
186 void testBlockingSubmitOnExecutorThread( final InitialInvoker initialInvoker,
187 final Invoker invoker ) throws Exception {
189 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
190 final CountDownLatch latch = new CountDownLatch( 1 );
192 Runnable task = new Runnable() {
197 invoker.invokeExecutor( executor, null ).get();
198 } catch( ExecutionException e ) {
199 caughtEx.set( e.getCause() );
200 } catch( Throwable e ) {
209 initialInvoker.invokeExecutor( executor, task );
211 assertTrue( "Task did not complete - executor likely deadlocked",
212 latch.await( 5, TimeUnit.SECONDS ) );
214 assertNotNull( "Expected exception thrown", caughtEx.get() );
215 assertEquals( "Caught exception type", TestDeadlockException.class, caughtEx.get().getClass() );
219 public void testListenableFutureCallbackWithExecutor() throws InterruptedException {
221 String listenerThreadPrefix = "ListenerThread";
222 ExecutorService listenerExecutor = Executors.newFixedThreadPool( 1,
223 new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix + "-%d" ).build() );
225 executor = new DeadlockDetectingListeningExecutorService(
226 Executors.newSingleThreadExecutor(
227 new ThreadFactoryBuilder().setNameFormat( "SingleThread" ).build() ),
228 DEADLOCK_EXECUTOR_SUPPLIER, listenerExecutor );
231 testListenerCallback( executor, SUBMIT_CALLABLE, listenerThreadPrefix );
232 testListenerCallback( executor, SUBMIT_RUNNABLE, listenerThreadPrefix );
233 testListenerCallback( executor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
235 listenerExecutor.shutdownNow();