Migrate common/util to JUnit5
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / AsyncNotifyingListeningExecutorServiceTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static org.junit.jupiter.api.Assertions.assertSame;
11 import static org.junit.jupiter.api.Assertions.assertTrue;
12 import static org.junit.jupiter.api.Assertions.fail;
13 import static org.mockito.Mockito.doNothing;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.verify;
17 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
18 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
19 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
20
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26 import java.util.ArrayList;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.jupiter.api.AfterEach;
33 import org.junit.jupiter.api.Test;
34 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
35
36 /**
37  * Unit tests for AsyncNotifyingListeningExecutorService.
38  *
39  * @author Thomas Pantelis
40  */
41 class AsyncNotifyingListeningExecutorServiceTest {
42
43     private ExecutorService listenerExecutor;
44     private AsyncNotifyingListeningExecutorService testExecutor;
45
46     @AfterEach
47     void tearDown() {
48         if (listenerExecutor != null) {
49             listenerExecutor.shutdownNow();
50         }
51
52         if (testExecutor != null) {
53             testExecutor.shutdownNow();
54         }
55     }
56
57     @Test
58     void testListenerCallbackWithExecutor() throws InterruptedException {
59
60         final var listenerThreadPrefix = "ListenerThread";
61         listenerExecutor = Executors.newFixedThreadPool(3,
62                 new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix + "-%d").build());
63
64         testExecutor = new AsyncNotifyingListeningExecutorService(
65                 Executors.newSingleThreadExecutor(
66                         new ThreadFactoryBuilder().setNameFormat("SingleThread").build()),
67                 listenerExecutor);
68
69         testListenerCallback(testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix);
70         testListenerCallback(testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix);
71         testListenerCallback(testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
72     }
73
74     @Test
75     void testListenerCallbackWithNoExecutor() throws InterruptedException {
76
77         final var listenerThreadPrefix = "SingleThread";
78         testExecutor = new AsyncNotifyingListeningExecutorService(
79                 Executors.newSingleThreadExecutor(
80                         new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix).build()),
81                 null);
82
83         testListenerCallback(testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix);
84         testListenerCallback(testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix);
85         testListenerCallback(testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
86     }
87
88     static void testListenerCallback(final AsyncNotifyingListeningExecutorService executor,
89             final Invoker invoker, final String expListenerThreadPrefix) throws InterruptedException {
90
91         final var assertError = new AtomicReference<AssertionError>();
92         var futureNotifiedLatch = new CountDownLatch(1);
93         final var blockTaskLatch = new CountDownLatch(1);
94
95         // The blockTaskLatch is used to block the task from completing until we've added
96         // our listener to the Future. Otherwise, if the task completes quickly and the Future is
97         // set to done before we've added our listener, the call to ListenableFuture#addListener
98         // will immediately notify synchronously on this thread as Futures#addCallback defaults to
99         // a same thread executor. This would erroneously fail the test.
100
101         final var future = invoker.invokeExecutor(executor, blockTaskLatch);
102         addCallback(future, futureNotifiedLatch, expListenerThreadPrefix, assertError);
103
104         // Now that we've added our listener, signal the latch to let the task complete.
105
106         blockTaskLatch.countDown();
107
108         assertTrue(futureNotifiedLatch.await(5, TimeUnit.SECONDS),
109                     "ListenableFuture callback was not notified of onSuccess");
110
111         if (assertError.get() != null) {
112             throw assertError.get();
113         }
114
115         // Add another listener - since the Future is already complete, we expect the listener to be
116         // notified inline on this thread when it's added.
117
118         futureNotifiedLatch = new CountDownLatch(1);
119         addCallback(future, futureNotifiedLatch, Thread.currentThread().getName(), assertError);
120
121         assertTrue(futureNotifiedLatch.await(5, TimeUnit.SECONDS),
122                     "ListenableFuture callback was not notified of onSuccess");
123
124         if (assertError.get() != null) {
125             throw assertError.get();
126         }
127     }
128
129     static void addCallback(final ListenableFuture<?> future, final CountDownLatch futureNotifiedLatch,
130             final String expListenerThreadPrefix, final AtomicReference<AssertionError> assertError) {
131
132         Futures.addCallback(future, new FutureCallback<Object>() {
133             @Override
134             public void onSuccess(final Object result) {
135                 try {
136                     final var theadName = Thread.currentThread().getName();
137                     assertTrue(theadName.startsWith(expListenerThreadPrefix),
138                             "ListenableFuture callback was not notified on the listener executor."
139                         + " Expected thread name prefix \"" + expListenerThreadPrefix
140                         + "\". Actual thread name \"" + theadName + "\"");
141                 } catch (AssertionError e) {
142                     assertError.set(e);
143                 } finally {
144                     futureNotifiedLatch.countDown();
145                 }
146             }
147
148             @Override
149             @SuppressWarnings("checkstyle:parameterName")
150             public void onFailure(final Throwable cause) {
151                 // Shouldn't happen
152                 fail("Unexpected failure " + cause);
153             }
154         }, MoreExecutors.directExecutor());
155     }
156
157     @Test
158     void testDelegatedMethods() throws InterruptedException {
159
160         final var task = (Runnable) () -> { };
161
162         final var taskList = new ArrayList<Runnable>();
163
164         final var mockDelegate = mock(ExecutorService.class);
165         doNothing().when(mockDelegate).execute(task);
166         doNothing().when(mockDelegate).shutdown();
167         doReturn(taskList).when(mockDelegate).shutdownNow();
168         doReturn(Boolean.TRUE).when(mockDelegate).awaitTermination(3, TimeUnit.SECONDS);
169         doReturn(Boolean.TRUE).when(mockDelegate).isShutdown();
170         doReturn(Boolean.TRUE).when(mockDelegate).isTerminated();
171
172         final var executor = new AsyncNotifyingListeningExecutorService(
173                                                                    mockDelegate, null);
174
175         executor.execute(task);
176         executor.shutdown();
177         assertTrue(executor.awaitTermination(3, TimeUnit.SECONDS), "awaitTermination");
178         assertSame(taskList, executor.shutdownNow(), "shutdownNow");
179         assertTrue(executor.isShutdown(), "isShutdown");
180         assertTrue(executor.isTerminated(), "isTerminated");
181
182         verify(mockDelegate).execute(task);
183         verify(mockDelegate).shutdown();
184         verify(mockDelegate).awaitTermination(3, TimeUnit.SECONDS);
185         verify(mockDelegate).shutdownNow();
186         verify(mockDelegate).isShutdown();
187         verify(mockDelegate).isTerminated();
188     }
189 }