2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.assertTrue;
15 import java.util.List;
16 import java.util.concurrent.CountDownLatch;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.atomic.AtomicReference;
22 import org.junit.After;
23 import org.junit.Test;
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.verify;
27 import static org.mockito.Mockito.doNothing;
28 import static org.mockito.Mockito.doReturn;
30 import com.google.common.collect.Lists;
31 import com.google.common.util.concurrent.FutureCallback;
32 import com.google.common.util.concurrent.Futures;
33 import com.google.common.util.concurrent.ListenableFuture;
34 import com.google.common.util.concurrent.ThreadFactoryBuilder;
36 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
37 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
38 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
39 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
42 * Unit tests for AsyncNotifyingListeningExecutorService.
44 * @author Thomas Pantelis
46 public class AsyncNotifyingListeningExecutorServiceTest {
48 private ExecutorService listenerExecutor;
49 private AsyncNotifyingListeningExecutorService testExecutor;
52 public void tearDown() {
53 if (listenerExecutor != null ) {
54 listenerExecutor.shutdownNow();
57 if (testExecutor != null ) {
58 testExecutor.shutdownNow();
63 public void testListenerCallbackWithExecutor() throws InterruptedException {
65 String listenerThreadPrefix = "ListenerThread";
66 listenerExecutor = Executors.newFixedThreadPool( 3,
67 new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix + "-%d" ).build() );
69 testExecutor = new AsyncNotifyingListeningExecutorService(
70 Executors.newSingleThreadExecutor(
71 new ThreadFactoryBuilder().setNameFormat( "SingleThread" ).build() ),
74 testListenerCallback( testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix );
75 testListenerCallback( testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix );
76 testListenerCallback( testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
80 public void testListenerCallbackWithNoExecutor() throws InterruptedException {
82 String listenerThreadPrefix = "SingleThread";
83 testExecutor = new AsyncNotifyingListeningExecutorService(
84 Executors.newSingleThreadExecutor(
85 new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix ).build() ),
88 testListenerCallback( testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix );
89 testListenerCallback( testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix );
90 testListenerCallback( testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
93 static void testListenerCallback( AsyncNotifyingListeningExecutorService executor,
94 Invoker invoker, final String expListenerThreadPrefix ) throws InterruptedException {
96 AtomicReference<AssertionError> assertError = new AtomicReference<>();
97 CountDownLatch futureNotifiedLatch = new CountDownLatch( 1 );
98 CountDownLatch blockTaskLatch = new CountDownLatch( 1 );
100 // The blockTaskLatch is used to block the task from completing until we've added
101 // our listener to the Future. Otherwise, if the task completes quickly and the Future is
102 // set to done before we've added our listener, the call to ListenableFuture#addListener
103 // will immediately notify synchronously on this thread as Futures#addCallback defaults to
104 // a same thread executor. This would erroneously fail the test.
106 ListenableFuture<?> future = invoker.invokeExecutor( executor, blockTaskLatch );
107 addCallback( future, futureNotifiedLatch, expListenerThreadPrefix, assertError );
109 // Now that we've added our listener, signal the latch to let the task complete.
111 blockTaskLatch.countDown();
113 assertTrue( "ListenableFuture callback was not notified of onSuccess",
114 futureNotifiedLatch.await( 5, TimeUnit.SECONDS ) );
116 if (assertError.get() != null ) {
117 throw assertError.get();
120 // Add another listener - since the Future is already complete, we expect the listener to be
121 // notified inline on this thread when it's added.
123 futureNotifiedLatch = new CountDownLatch( 1 );
124 addCallback( future, futureNotifiedLatch, Thread.currentThread().getName(), assertError );
126 assertTrue( "ListenableFuture callback was not notified of onSuccess",
127 futureNotifiedLatch.await( 5, TimeUnit.SECONDS ) );
129 if (assertError.get() != null ) {
130 throw assertError.get();
134 static void addCallback( ListenableFuture<?> future,
135 final CountDownLatch futureNotifiedLatch,
136 final String expListenerThreadPrefix,
137 final AtomicReference<AssertionError> assertError ) {
139 Futures.addCallback( future, new FutureCallback<Object>() {
141 public void onSuccess( Object result ) {
144 String theadName = Thread.currentThread().getName();
145 assertTrue( "ListenableFuture callback was not notified on the listener executor."
146 + " Expected thread name prefix \"" + expListenerThreadPrefix +
147 "\". Actual thread name \"" + theadName + "\"",
148 theadName.startsWith( expListenerThreadPrefix ) );
149 } catch( AssertionError e ) {
150 assertError.set( e );
152 futureNotifiedLatch.countDown();
157 public void onFailure( Throwable t ) {
165 public void testDelegatedMethods() throws InterruptedException {
167 Runnable task = new Runnable() {
173 List<Runnable> taskList = Lists.newArrayList();
175 ExecutorService mockDelegate = mock( ExecutorService.class );
176 doNothing().when( mockDelegate ).execute( task );
177 doNothing().when( mockDelegate ).shutdown();
178 doReturn( taskList ).when( mockDelegate ).shutdownNow();
179 doReturn( true ).when( mockDelegate ).awaitTermination( 3, TimeUnit.SECONDS );
180 doReturn( true ).when( mockDelegate ).isShutdown();
181 doReturn( true ).when( mockDelegate ).isTerminated();
183 AsyncNotifyingListeningExecutorService executor = new AsyncNotifyingListeningExecutorService(
184 mockDelegate, null );
186 executor.execute( task );
188 assertEquals( "awaitTermination", true, executor.awaitTermination( 3, TimeUnit.SECONDS ) );
189 assertSame( "shutdownNow", taskList, executor.shutdownNow() );
190 assertEquals( "isShutdown", true, executor.isShutdown() );
191 assertEquals( "isTerminated", true, executor.isTerminated() );
193 verify( mockDelegate ).execute( task );
194 verify( mockDelegate ).shutdown();
195 verify( mockDelegate ).awaitTermination( 3, TimeUnit.SECONDS );
196 verify( mockDelegate ).shutdownNow();
197 verify( mockDelegate ).isShutdown();
198 verify( mockDelegate ).isTerminated();