15c9162dddd9784d382d4777a2870b0a6fecf335
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / AsyncNotifyingListeningExecutorServiceTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.assertTrue;
14
15 import java.util.List;
16 import java.util.concurrent.CountDownLatch;
17 import java.util.concurrent.ExecutorService;
18 import java.util.concurrent.Executors;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.atomic.AtomicReference;
21
22 import org.junit.After;
23 import org.junit.Test;
24
25 import static org.mockito.Mockito.mock;
26 import static org.mockito.Mockito.verify;
27 import static org.mockito.Mockito.doNothing;
28 import static org.mockito.Mockito.doReturn;
29
30 import com.google.common.collect.Lists;
31 import com.google.common.util.concurrent.FutureCallback;
32 import com.google.common.util.concurrent.Futures;
33 import com.google.common.util.concurrent.ListenableFuture;
34 import com.google.common.util.concurrent.ThreadFactoryBuilder;
35
36 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
37 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
38 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
39 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
40
41 /**
42  * Unit tests for AsyncNotifyingListeningExecutorService.
43  *
44  * @author Thomas Pantelis
45  */
46 public class AsyncNotifyingListeningExecutorServiceTest {
47
48     private ExecutorService listenerExecutor;
49     private AsyncNotifyingListeningExecutorService testExecutor;
50
51     @After
52     public void tearDown() {
53         if (listenerExecutor != null ) {
54             listenerExecutor.shutdownNow();
55         }
56
57         if (testExecutor != null ) {
58             testExecutor.shutdownNow();
59         }
60     }
61
62     @Test
63     public void testListenerCallbackWithExecutor() throws InterruptedException {
64
65         String listenerThreadPrefix = "ListenerThread";
66         listenerExecutor = Executors.newFixedThreadPool( 3,
67                 new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix + "-%d" ).build() );
68
69         testExecutor = new AsyncNotifyingListeningExecutorService(
70                 Executors.newSingleThreadExecutor(
71                         new ThreadFactoryBuilder().setNameFormat( "SingleThread" ).build() ),
72                 listenerExecutor );
73
74         testListenerCallback( testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix );
75         testListenerCallback( testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix );
76         testListenerCallback( testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
77     }
78
79     @Test
80     public void testListenerCallbackWithNoExecutor() throws InterruptedException {
81
82         String listenerThreadPrefix = "SingleThread";
83         testExecutor = new AsyncNotifyingListeningExecutorService(
84                 Executors.newSingleThreadExecutor(
85                         new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix ).build() ),
86                 null );
87
88         testListenerCallback( testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix );
89         testListenerCallback( testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix );
90         testListenerCallback( testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
91     }
92
93     static void testListenerCallback( AsyncNotifyingListeningExecutorService executor,
94             Invoker invoker, final String expListenerThreadPrefix ) throws InterruptedException {
95
96         AtomicReference<AssertionError> assertError = new AtomicReference<>();
97         CountDownLatch futureNotifiedLatch = new CountDownLatch( 1 );
98         CountDownLatch blockTaskLatch = new CountDownLatch( 1 );
99
100         // The blockTaskLatch is used to block the task from completing until we've added
101         // our listener to the Future. Otherwise, if the task completes quickly and the Future is
102         // set to done before we've added our listener, the call to ListenableFuture#addListener
103         // will immediately notify synchronously on this thread as Futures#addCallback defaults to
104         // a same thread executor. This would erroneously fail the test.
105
106         ListenableFuture<?> future = invoker.invokeExecutor( executor, blockTaskLatch );
107         addCallback( future, futureNotifiedLatch, expListenerThreadPrefix, assertError );
108
109         // Now that we've added our listener, signal the latch to let the task complete.
110
111         blockTaskLatch.countDown();
112
113         assertTrue( "ListenableFuture callback was not notified of onSuccess",
114                     futureNotifiedLatch.await( 5, TimeUnit.SECONDS ) );
115
116         if (assertError.get() != null ) {
117             throw assertError.get();
118         }
119
120         // Add another listener - since the Future is already complete, we expect the listener to be
121         // notified inline on this thread when it's added.
122
123         futureNotifiedLatch = new CountDownLatch( 1 );
124         addCallback( future, futureNotifiedLatch, Thread.currentThread().getName(), assertError );
125
126         assertTrue( "ListenableFuture callback was not notified of onSuccess",
127                     futureNotifiedLatch.await( 5, TimeUnit.SECONDS ) );
128
129         if (assertError.get() != null ) {
130             throw assertError.get();
131         }
132     }
133
134     static void addCallback( ListenableFuture<?> future,
135             final CountDownLatch futureNotifiedLatch,
136             final String expListenerThreadPrefix,
137             final AtomicReference<AssertionError> assertError ) {
138
139         Futures.addCallback( future, new FutureCallback<Object>() {
140             @Override
141             public void onSuccess( Object result ) {
142
143                 try {
144                     String theadName = Thread.currentThread().getName();
145                     assertTrue( "ListenableFuture callback was not notified on the listener executor."
146                         + " Expected thread name prefix \"" + expListenerThreadPrefix +
147                             "\". Actual thread name \"" + theadName + "\"",
148                             theadName.startsWith( expListenerThreadPrefix ) );
149                 } catch( AssertionError e ) {
150                     assertError.set( e );
151                 } finally {
152                     futureNotifiedLatch.countDown();
153                 }
154             }
155
156             @Override
157             public void onFailure( Throwable t ) {
158                 // Shouldn't happen
159                 t.printStackTrace();
160             }
161         } );
162     }
163
164     @Test
165     public void testDelegatedMethods() throws InterruptedException {
166
167         Runnable task = new Runnable() {
168             @Override
169             public void run(){
170             }
171         };
172
173         List<Runnable> taskList = Lists.newArrayList();
174
175         ExecutorService mockDelegate = mock( ExecutorService.class );
176         doNothing().when( mockDelegate ).execute( task );
177         doNothing().when( mockDelegate ).shutdown();
178         doReturn( taskList ).when( mockDelegate ).shutdownNow();
179         doReturn( true ).when( mockDelegate ).awaitTermination( 3, TimeUnit.SECONDS );
180         doReturn( true ).when( mockDelegate ).isShutdown();
181         doReturn( true ).when( mockDelegate ).isTerminated();
182
183         AsyncNotifyingListeningExecutorService executor = new AsyncNotifyingListeningExecutorService(
184                                                                    mockDelegate, null );
185
186         executor.execute( task );
187         executor.shutdown();
188         assertEquals( "awaitTermination", true, executor.awaitTermination( 3, TimeUnit.SECONDS ) );
189         assertSame( "shutdownNow", taskList, executor.shutdownNow() );
190         assertEquals( "isShutdown", true, executor.isShutdown() );
191         assertEquals( "isTerminated", true, executor.isTerminated() );
192
193         verify( mockDelegate ).execute( task );
194         verify( mockDelegate ).shutdown();
195         verify( mockDelegate ).awaitTermination( 3, TimeUnit.SECONDS );
196         verify( mockDelegate ).shutdownNow();
197         verify( mockDelegate ).isShutdown();
198         verify( mockDelegate ).isTerminated();
199     }
200 }