2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.opendaylight.yangtools.util.concurrent.AsyncNotifyingListeningExecutorServiceTest.testListenerCallback;
15 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
16 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
17 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
19 import com.google.common.base.Supplier;
20 import com.google.common.util.concurrent.FutureCallback;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.ListenableFuture;
23 import com.google.common.util.concurrent.ListeningExecutorService;
24 import com.google.common.util.concurrent.ThreadFactoryBuilder;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
38 * Unit tests for DeadlockDetectingListeningExecutorService.
40 * @author Thomas Pantelis
42 public class DeadlockDetectingListeningExecutorServiceTest {
44 interface InitialInvoker {
45 void invokeExecutor( ListeningExecutorService executor, Runnable task );
48 static final InitialInvoker SUBMIT = new InitialInvoker() {
50 public void invokeExecutor( final ListeningExecutorService executor, final Runnable task ) {
51 executor.submit( task );
55 static final InitialInvoker EXECUTE = new InitialInvoker() {
57 public void invokeExecutor( final ListeningExecutorService executor, final Runnable task ) {
58 executor.execute( task );
62 @SuppressWarnings("serial")
63 public static class TestDeadlockException extends Exception {
66 private static final Supplier<Exception> DEADLOCK_EXECUTOR_SUPPLIER = new Supplier<Exception>() {
68 public Exception get() {
69 return new TestDeadlockException();
73 DeadlockDetectingListeningExecutorService executor;
80 public void tearDown() {
81 if (executor != null ) {
82 executor.shutdownNow();
86 DeadlockDetectingListeningExecutorService newExecutor() {
87 return new DeadlockDetectingListeningExecutorService( Executors.newSingleThreadExecutor(),
88 DEADLOCK_EXECUTOR_SUPPLIER );
92 public void testBlockingSubmitOffExecutor() throws Exception {
94 executor = newExecutor();
96 // Test submit with Callable.
98 ListenableFuture<String> future = executor.submit( new Callable<String>() {
100 public String call() throws Exception{
105 assertEquals( "Future result", "foo", future.get( 5, TimeUnit.SECONDS ) );
107 // Test submit with Runnable.
109 executor.submit( new Runnable() {
115 // Test submit with Runnable and value.
117 future = executor.submit( new Runnable() {
123 assertEquals( "Future result", "foo", future.get( 5, TimeUnit.SECONDS ) );
127 public void testNonBlockingSubmitOnExecutorThread() throws Throwable {
129 executor = newExecutor();
131 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE );
132 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE );
133 testNonBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT );
135 testNonBlockingSubmitOnExecutorThread( EXECUTE, SUBMIT_CALLABLE );
138 void testNonBlockingSubmitOnExecutorThread( final InitialInvoker initialInvoker,
139 final Invoker invoker ) throws Throwable {
141 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
142 final CountDownLatch futureCompletedLatch = new CountDownLatch( 1 );
144 Runnable task = new Runnable() {
145 @SuppressWarnings({ "unchecked", "rawtypes" })
149 Futures.addCallback( invoker.invokeExecutor( executor, null ), new FutureCallback() {
151 public void onSuccess( final Object result ) {
152 futureCompletedLatch.countDown();
156 public void onFailure( final Throwable t ) {
158 futureCompletedLatch.countDown();
165 initialInvoker.invokeExecutor( executor, task );
167 assertTrue( "Task did not complete - executor likely deadlocked",
168 futureCompletedLatch.await( 5, TimeUnit.SECONDS ) );
170 if (caughtEx.get() != null ) {
171 throw caughtEx.get();
176 public void testBlockingSubmitOnExecutorThread() throws Exception {
178 executor = newExecutor();
180 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_CALLABLE );
181 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE );
182 testBlockingSubmitOnExecutorThread( SUBMIT, SUBMIT_RUNNABLE_WITH_RESULT );
184 testBlockingSubmitOnExecutorThread( EXECUTE, SUBMIT_CALLABLE );
187 void testBlockingSubmitOnExecutorThread( final InitialInvoker initialInvoker,
188 final Invoker invoker ) throws Exception {
190 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
191 final CountDownLatch latch = new CountDownLatch( 1 );
193 Runnable task = new Runnable() {
198 invoker.invokeExecutor( executor, null ).get();
199 } catch( ExecutionException e ) {
200 caughtEx.set( e.getCause() );
201 } catch( Throwable e ) {
210 initialInvoker.invokeExecutor( executor, task );
212 assertTrue( "Task did not complete - executor likely deadlocked",
213 latch.await( 5, TimeUnit.SECONDS ) );
215 assertNotNull( "Expected exception thrown", caughtEx.get() );
216 assertEquals( "Caught exception type", TestDeadlockException.class, caughtEx.get().getClass() );
220 public void testListenableFutureCallbackWithExecutor() throws InterruptedException {
222 String listenerThreadPrefix = "ListenerThread";
223 ExecutorService listenerExecutor = Executors.newFixedThreadPool( 1,
224 new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix + "-%d" ).build() );
226 executor = new DeadlockDetectingListeningExecutorService(
227 Executors.newSingleThreadExecutor(
228 new ThreadFactoryBuilder().setNameFormat( "SingleThread" ).build() ),
229 DEADLOCK_EXECUTOR_SUPPLIER, listenerExecutor );
232 testListenerCallback( executor, SUBMIT_CALLABLE, listenerThreadPrefix );
233 testListenerCallback( executor, SUBMIT_RUNNABLE, listenerThreadPrefix );
234 testListenerCallback( executor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
236 listenerExecutor.shutdownNow();