Organize Imports to be Checkstyle compliant in utils
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / AsyncNotifyingListeningExecutorServiceTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertSame;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.verify;
18 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
19 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
20 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
21
22 import com.google.common.collect.Lists;
23 import com.google.common.util.concurrent.FutureCallback;
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import com.google.common.util.concurrent.ThreadFactoryBuilder;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.After;
34 import org.junit.Test;
35 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
36
37 /**
38  * Unit tests for AsyncNotifyingListeningExecutorService.
39  *
40  * @author Thomas Pantelis
41  */
42 public class AsyncNotifyingListeningExecutorServiceTest {
43
44     private ExecutorService listenerExecutor;
45     private AsyncNotifyingListeningExecutorService testExecutor;
46
47     @After
48     public void tearDown() {
49         if (listenerExecutor != null ) {
50             listenerExecutor.shutdownNow();
51         }
52
53         if (testExecutor != null ) {
54             testExecutor.shutdownNow();
55         }
56     }
57
58     @Test
59     public void testListenerCallbackWithExecutor() throws InterruptedException {
60
61         String listenerThreadPrefix = "ListenerThread";
62         listenerExecutor = Executors.newFixedThreadPool( 3,
63                 new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix + "-%d" ).build() );
64
65         testExecutor = new AsyncNotifyingListeningExecutorService(
66                 Executors.newSingleThreadExecutor(
67                         new ThreadFactoryBuilder().setNameFormat( "SingleThread" ).build() ),
68                 listenerExecutor );
69
70         testListenerCallback( testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix );
71         testListenerCallback( testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix );
72         testListenerCallback( testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
73     }
74
75     @Test
76     public void testListenerCallbackWithNoExecutor() throws InterruptedException {
77
78         String listenerThreadPrefix = "SingleThread";
79         testExecutor = new AsyncNotifyingListeningExecutorService(
80                 Executors.newSingleThreadExecutor(
81                         new ThreadFactoryBuilder().setNameFormat( listenerThreadPrefix ).build() ),
82                 null );
83
84         testListenerCallback( testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix );
85         testListenerCallback( testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix );
86         testListenerCallback( testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix );
87     }
88
89     static void testListenerCallback( AsyncNotifyingListeningExecutorService executor,
90             Invoker invoker, final String expListenerThreadPrefix ) throws InterruptedException {
91
92         AtomicReference<AssertionError> assertError = new AtomicReference<>();
93         CountDownLatch futureNotifiedLatch = new CountDownLatch( 1 );
94         CountDownLatch blockTaskLatch = new CountDownLatch( 1 );
95
96         // The blockTaskLatch is used to block the task from completing until we've added
97         // our listener to the Future. Otherwise, if the task completes quickly and the Future is
98         // set to done before we've added our listener, the call to ListenableFuture#addListener
99         // will immediately notify synchronously on this thread as Futures#addCallback defaults to
100         // a same thread executor. This would erroneously fail the test.
101
102         ListenableFuture<?> future = invoker.invokeExecutor( executor, blockTaskLatch );
103         addCallback( future, futureNotifiedLatch, expListenerThreadPrefix, assertError );
104
105         // Now that we've added our listener, signal the latch to let the task complete.
106
107         blockTaskLatch.countDown();
108
109         assertTrue( "ListenableFuture callback was not notified of onSuccess",
110                     futureNotifiedLatch.await( 5, TimeUnit.SECONDS ) );
111
112         if (assertError.get() != null ) {
113             throw assertError.get();
114         }
115
116         // Add another listener - since the Future is already complete, we expect the listener to be
117         // notified inline on this thread when it's added.
118
119         futureNotifiedLatch = new CountDownLatch( 1 );
120         addCallback( future, futureNotifiedLatch, Thread.currentThread().getName(), assertError );
121
122         assertTrue( "ListenableFuture callback was not notified of onSuccess",
123                     futureNotifiedLatch.await( 5, TimeUnit.SECONDS ) );
124
125         if (assertError.get() != null ) {
126             throw assertError.get();
127         }
128     }
129
130     static void addCallback( ListenableFuture<?> future,
131             final CountDownLatch futureNotifiedLatch,
132             final String expListenerThreadPrefix,
133             final AtomicReference<AssertionError> assertError ) {
134
135         Futures.addCallback( future, new FutureCallback<Object>() {
136             @Override
137             public void onSuccess( Object result ) {
138
139                 try {
140                     String theadName = Thread.currentThread().getName();
141                     assertTrue( "ListenableFuture callback was not notified on the listener executor."
142                         + " Expected thread name prefix \"" + expListenerThreadPrefix +
143                             "\". Actual thread name \"" + theadName + "\"",
144                             theadName.startsWith( expListenerThreadPrefix ) );
145                 } catch( AssertionError e ) {
146                     assertError.set( e );
147                 } finally {
148                     futureNotifiedLatch.countDown();
149                 }
150             }
151
152             @Override
153             public void onFailure( Throwable t ) {
154                 // Shouldn't happen
155                 t.printStackTrace();
156             }
157         } );
158     }
159
160     @Test
161     public void testDelegatedMethods() throws InterruptedException {
162
163         Runnable task = new Runnable() {
164             @Override
165             public void run(){
166             }
167         };
168
169         List<Runnable> taskList = Lists.newArrayList();
170
171         ExecutorService mockDelegate = mock( ExecutorService.class );
172         doNothing().when( mockDelegate ).execute( task );
173         doNothing().when( mockDelegate ).shutdown();
174         doReturn( taskList ).when( mockDelegate ).shutdownNow();
175         doReturn( true ).when( mockDelegate ).awaitTermination( 3, TimeUnit.SECONDS );
176         doReturn( true ).when( mockDelegate ).isShutdown();
177         doReturn( true ).when( mockDelegate ).isTerminated();
178
179         AsyncNotifyingListeningExecutorService executor = new AsyncNotifyingListeningExecutorService(
180                                                                    mockDelegate, null );
181
182         executor.execute( task );
183         executor.shutdown();
184         assertEquals( "awaitTermination", true, executor.awaitTermination( 3, TimeUnit.SECONDS ) );
185         assertSame( "shutdownNow", taskList, executor.shutdownNow() );
186         assertEquals( "isShutdown", true, executor.isShutdown() );
187         assertEquals( "isTerminated", true, executor.isTerminated() );
188
189         verify( mockDelegate ).execute( task );
190         verify( mockDelegate ).shutdown();
191         verify( mockDelegate ).awaitTermination( 3, TimeUnit.SECONDS );
192         verify( mockDelegate ).shutdownNow();
193         verify( mockDelegate ).isShutdown();
194         verify( mockDelegate ).isTerminated();
195     }
196 }