Fix eclipse/checkstyle warnings
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / AsyncNotifyingListeningExecutorServiceTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static org.junit.Assert.assertSame;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.verify;
18 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
19 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
20 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
21
22 import com.google.common.util.concurrent.FutureCallback;
23 import com.google.common.util.concurrent.Futures;
24 import com.google.common.util.concurrent.ListenableFuture;
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import javax.annotation.Nonnull;
34 import org.junit.After;
35 import org.junit.Test;
36 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
37
38 /**
39  * Unit tests for AsyncNotifyingListeningExecutorService.
40  *
41  * @author Thomas Pantelis
42  */
43 public class AsyncNotifyingListeningExecutorServiceTest {
44
45     private ExecutorService listenerExecutor;
46     private AsyncNotifyingListeningExecutorService testExecutor;
47
48     @After
49     public void tearDown() {
50         if (listenerExecutor != null) {
51             listenerExecutor.shutdownNow();
52         }
53
54         if (testExecutor != null) {
55             testExecutor.shutdownNow();
56         }
57     }
58
59     @Test
60     public void testListenerCallbackWithExecutor() throws InterruptedException {
61
62         String listenerThreadPrefix = "ListenerThread";
63         listenerExecutor = Executors.newFixedThreadPool(3,
64                 new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix + "-%d").build());
65
66         testExecutor = new AsyncNotifyingListeningExecutorService(
67                 Executors.newSingleThreadExecutor(
68                         new ThreadFactoryBuilder().setNameFormat("SingleThread").build()),
69                 listenerExecutor);
70
71         testListenerCallback(testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix);
72         testListenerCallback(testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix);
73         testListenerCallback(testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
74     }
75
76     @Test
77     public void testListenerCallbackWithNoExecutor() throws InterruptedException {
78
79         String listenerThreadPrefix = "SingleThread";
80         testExecutor = new AsyncNotifyingListeningExecutorService(
81                 Executors.newSingleThreadExecutor(
82                         new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix).build()),
83                 null);
84
85         testListenerCallback(testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix);
86         testListenerCallback(testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix);
87         testListenerCallback(testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
88     }
89
90     static void testListenerCallback(final AsyncNotifyingListeningExecutorService executor,
91             final Invoker invoker, final String expListenerThreadPrefix) throws InterruptedException {
92
93         AtomicReference<AssertionError> assertError = new AtomicReference<>();
94         CountDownLatch futureNotifiedLatch = new CountDownLatch(1);
95         CountDownLatch blockTaskLatch = new CountDownLatch(1);
96
97         // The blockTaskLatch is used to block the task from completing until we've added
98         // our listener to the Future. Otherwise, if the task completes quickly and the Future is
99         // set to done before we've added our listener, the call to ListenableFuture#addListener
100         // will immediately notify synchronously on this thread as Futures#addCallback defaults to
101         // a same thread executor. This would erroneously fail the test.
102
103         ListenableFuture<?> future = invoker.invokeExecutor(executor, blockTaskLatch);
104         addCallback(future, futureNotifiedLatch, expListenerThreadPrefix, assertError);
105
106         // Now that we've added our listener, signal the latch to let the task complete.
107
108         blockTaskLatch.countDown();
109
110         assertTrue("ListenableFuture callback was not notified of onSuccess",
111                     futureNotifiedLatch.await(5, TimeUnit.SECONDS));
112
113         if (assertError.get() != null) {
114             throw assertError.get();
115         }
116
117         // Add another listener - since the Future is already complete, we expect the listener to be
118         // notified inline on this thread when it's added.
119
120         futureNotifiedLatch = new CountDownLatch(1);
121         addCallback(future, futureNotifiedLatch, Thread.currentThread().getName(), assertError);
122
123         assertTrue("ListenableFuture callback was not notified of onSuccess",
124                     futureNotifiedLatch.await(5, TimeUnit.SECONDS));
125
126         if (assertError.get() != null) {
127             throw assertError.get();
128         }
129     }
130
131     static void addCallback(final ListenableFuture<?> future, final CountDownLatch futureNotifiedLatch,
132             final String expListenerThreadPrefix, final AtomicReference<AssertionError> assertError) {
133
134         Futures.addCallback(future, new FutureCallback<Object>() {
135             @Override
136             public void onSuccess(final Object result) {
137                 try {
138                     String theadName = Thread.currentThread().getName();
139                     assertTrue("ListenableFuture callback was not notified on the listener executor."
140                         + " Expected thread name prefix \"" + expListenerThreadPrefix
141                         + "\". Actual thread name \"" + theadName + "\"",
142                             theadName.startsWith(expListenerThreadPrefix));
143                 } catch (AssertionError e) {
144                     assertError.set(e);
145                 } finally {
146                     futureNotifiedLatch.countDown();
147                 }
148             }
149
150             @Override
151             @SuppressWarnings("checkstyle:parameterName")
152             public void onFailure(@Nonnull final Throwable t) {
153                 // Shouldn't happen
154                 fail("Unexpected failure " + t);
155             }
156         });
157     }
158
159     @Test
160     public void testDelegatedMethods() throws InterruptedException {
161
162         Runnable task = () -> { };
163
164         List<Runnable> taskList = new ArrayList<>();
165
166         ExecutorService mockDelegate = mock(ExecutorService.class);
167         doNothing().when(mockDelegate).execute(task);
168         doNothing().when(mockDelegate).shutdown();
169         doReturn(taskList).when(mockDelegate).shutdownNow();
170         doReturn(Boolean.TRUE).when(mockDelegate).awaitTermination(3, TimeUnit.SECONDS);
171         doReturn(Boolean.TRUE).when(mockDelegate).isShutdown();
172         doReturn(Boolean.TRUE).when(mockDelegate).isTerminated();
173
174         AsyncNotifyingListeningExecutorService executor = new AsyncNotifyingListeningExecutorService(
175                                                                    mockDelegate, null);
176
177         executor.execute(task);
178         executor.shutdown();
179         assertTrue("awaitTermination", executor.awaitTermination(3, TimeUnit.SECONDS));
180         assertSame("shutdownNow", taskList, executor.shutdownNow());
181         assertTrue("isShutdown", executor.isShutdown());
182         assertTrue("isTerminated", executor.isTerminated());
183
184         verify(mockDelegate).execute(task);
185         verify(mockDelegate).shutdown();
186         verify(mockDelegate).awaitTermination(3, TimeUnit.SECONDS);
187         verify(mockDelegate).shutdownNow();
188         verify(mockDelegate).isShutdown();
189         verify(mockDelegate).isTerminated();
190     }
191 }