2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.verify;
18 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
19 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
20 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
22 import com.google.common.collect.Lists;
23 import com.google.common.util.concurrent.FutureCallback;
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import com.google.common.util.concurrent.ThreadFactoryBuilder;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import javax.annotation.Nonnull;
34 import org.junit.After;
35 import org.junit.Test;
36 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
39 * Unit tests for AsyncNotifyingListeningExecutorService.
41 * @author Thomas Pantelis
43 public class AsyncNotifyingListeningExecutorServiceTest {
45 private ExecutorService listenerExecutor;
46 private AsyncNotifyingListeningExecutorService testExecutor;
49 public void tearDown() {
50 if (listenerExecutor != null ) {
51 listenerExecutor.shutdownNow();
54 if (testExecutor != null ) {
55 testExecutor.shutdownNow();
60 public void testListenerCallbackWithExecutor() throws InterruptedException {
62 String listenerThreadPrefix = "ListenerThread";
63 listenerExecutor = Executors.newFixedThreadPool( 3,
64 new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix + "-%d" ).build() );
66 testExecutor = new AsyncNotifyingListeningExecutorService(
67 Executors.newSingleThreadExecutor(
68 new ThreadFactoryBuilder().setNameFormat( "SingleThread" ).build() ),
71 testListenerCallback( testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix );
72 testListenerCallback( testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix );
73 testListenerCallback( testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
77 public void testListenerCallbackWithNoExecutor() throws InterruptedException {
79 String listenerThreadPrefix = "SingleThread";
80 testExecutor = new AsyncNotifyingListeningExecutorService(
81 Executors.newSingleThreadExecutor(
82 new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix ).build() ),
85 testListenerCallback( testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix );
86 testListenerCallback( testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix );
87 testListenerCallback( testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
90 static void testListenerCallback( AsyncNotifyingListeningExecutorService executor,
91 Invoker invoker, final String expListenerThreadPrefix ) throws InterruptedException {
93 AtomicReference<AssertionError> assertError = new AtomicReference<>();
94 CountDownLatch futureNotifiedLatch = new CountDownLatch( 1 );
95 CountDownLatch blockTaskLatch = new CountDownLatch( 1 );
97 // The blockTaskLatch is used to block the task from completing until we've added
98 // our listener to the Future. Otherwise, if the task completes quickly and the Future is
99 // set to done before we've added our listener, the call to ListenableFuture#addListener
100 // will immediately notify synchronously on this thread as Futures#addCallback defaults to
101 // a same thread executor. This would erroneously fail the test.
103 ListenableFuture<?> future = invoker.invokeExecutor( executor, blockTaskLatch );
104 addCallback( future, futureNotifiedLatch, expListenerThreadPrefix, assertError );
106 // Now that we've added our listener, signal the latch to let the task complete.
108 blockTaskLatch.countDown();
110 assertTrue( "ListenableFuture callback was not notified of onSuccess",
111 futureNotifiedLatch.await( 5, TimeUnit.SECONDS ) );
113 if (assertError.get() != null ) {
114 throw assertError.get();
117 // Add another listener - since the Future is already complete, we expect the listener to be
118 // notified inline on this thread when it's added.
120 futureNotifiedLatch = new CountDownLatch( 1 );
121 addCallback( future, futureNotifiedLatch, Thread.currentThread().getName(), assertError );
123 assertTrue( "ListenableFuture callback was not notified of onSuccess",
124 futureNotifiedLatch.await( 5, TimeUnit.SECONDS ) );
126 if (assertError.get() != null ) {
127 throw assertError.get();
131 static void addCallback( ListenableFuture<?> future,
132 final CountDownLatch futureNotifiedLatch,
133 final String expListenerThreadPrefix,
134 final AtomicReference<AssertionError> assertError ) {
136 Futures.addCallback( future, new FutureCallback<Object>() {
138 public void onSuccess( Object result ) {
141 String theadName = Thread.currentThread().getName();
142 assertTrue( "ListenableFuture callback was not notified on the listener executor."
143 + " Expected thread name prefix \"" + expListenerThreadPrefix +
144 "\". Actual thread name \"" + theadName + "\"",
145 theadName.startsWith( expListenerThreadPrefix ) );
146 } catch( AssertionError e ) {
147 assertError.set( e );
149 futureNotifiedLatch.countDown();
154 public void onFailure( @Nonnull Throwable t ) {
162 public void testDelegatedMethods() throws InterruptedException {
164 Runnable task = () -> {
167 List<Runnable> taskList = Lists.newArrayList();
169 ExecutorService mockDelegate = mock( ExecutorService.class );
170 doNothing().when( mockDelegate ).execute( task );
171 doNothing().when( mockDelegate ).shutdown();
172 doReturn( taskList ).when( mockDelegate ).shutdownNow();
173 doReturn( true ).when( mockDelegate ).awaitTermination( 3, TimeUnit.SECONDS );
174 doReturn( true ).when( mockDelegate ).isShutdown();
175 doReturn( true ).when( mockDelegate ).isTerminated();
177 AsyncNotifyingListeningExecutorService executor = new AsyncNotifyingListeningExecutorService(
178 mockDelegate, null );
180 executor.execute( task );
182 assertEquals( "awaitTermination", true, executor.awaitTermination( 3, TimeUnit.SECONDS ) );
183 assertSame( "shutdownNow", taskList, executor.shutdownNow() );
184 assertEquals( "isShutdown", true, executor.isShutdown() );
185 assertEquals( "isTerminated", true, executor.isTerminated() );
187 verify( mockDelegate ).execute( task );
188 verify( mockDelegate ).shutdown();
189 verify( mockDelegate ).awaitTermination( 3, TimeUnit.SECONDS );
190 verify( mockDelegate ).shutdownNow();
191 verify( mockDelegate ).isShutdown();
192 verify( mockDelegate ).isTerminated();