2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static org.junit.jupiter.api.Assertions.assertSame;
11 import static org.junit.jupiter.api.Assertions.assertTrue;
12 import static org.junit.jupiter.api.Assertions.fail;
13 import static org.mockito.Mockito.doNothing;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.verify;
17 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
18 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
19 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26 import java.util.ArrayList;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.jupiter.api.AfterEach;
33 import org.junit.jupiter.api.Test;
34 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
37 * Unit tests for AsyncNotifyingListeningExecutorService.
39 * @author Thomas Pantelis
41 class AsyncNotifyingListeningExecutorServiceTest {
43 private ExecutorService listenerExecutor;
44 private AsyncNotifyingListeningExecutorService testExecutor;
48 if (listenerExecutor != null) {
49 listenerExecutor.shutdownNow();
52 if (testExecutor != null) {
53 testExecutor.shutdownNow();
58 void testListenerCallbackWithExecutor() throws InterruptedException {
60 final var listenerThreadPrefix = "ListenerThread";
61 listenerExecutor = Executors.newFixedThreadPool(3,
62 new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix + "-%d").build());
64 testExecutor = new AsyncNotifyingListeningExecutorService(
65 Executors.newSingleThreadExecutor(
66 new ThreadFactoryBuilder().setNameFormat("SingleThread").build()),
69 testListenerCallback(testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix);
70 testListenerCallback(testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix);
71 testListenerCallback(testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
75 void testListenerCallbackWithNoExecutor() throws InterruptedException {
77 final var listenerThreadPrefix = "SingleThread";
78 testExecutor = new AsyncNotifyingListeningExecutorService(
79 Executors.newSingleThreadExecutor(
80 new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix).build()),
83 testListenerCallback(testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix);
84 testListenerCallback(testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix);
85 testListenerCallback(testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
88 static void testListenerCallback(final AsyncNotifyingListeningExecutorService executor,
89 final Invoker invoker, final String expListenerThreadPrefix) throws InterruptedException {
91 final var assertError = new AtomicReference<AssertionError>();
92 var futureNotifiedLatch = new CountDownLatch(1);
93 final var blockTaskLatch = new CountDownLatch(1);
95 // The blockTaskLatch is used to block the task from completing until we've added
96 // our listener to the Future. Otherwise, if the task completes quickly and the Future is
97 // set to done before we've added our listener, the call to ListenableFuture#addListener
98 // will immediately notify synchronously on this thread as Futures#addCallback defaults to
99 // a same thread executor. This would erroneously fail the test.
101 final var future = invoker.invokeExecutor(executor, blockTaskLatch);
102 addCallback(future, futureNotifiedLatch, expListenerThreadPrefix, assertError);
104 // Now that we've added our listener, signal the latch to let the task complete.
106 blockTaskLatch.countDown();
108 assertTrue(futureNotifiedLatch.await(5, TimeUnit.SECONDS),
109 "ListenableFuture callback was not notified of onSuccess");
111 if (assertError.get() != null) {
112 throw assertError.get();
115 // Add another listener - since the Future is already complete, we expect the listener to be
116 // notified inline on this thread when it's added.
118 futureNotifiedLatch = new CountDownLatch(1);
119 addCallback(future, futureNotifiedLatch, Thread.currentThread().getName(), assertError);
121 assertTrue(futureNotifiedLatch.await(5, TimeUnit.SECONDS),
122 "ListenableFuture callback was not notified of onSuccess");
124 if (assertError.get() != null) {
125 throw assertError.get();
129 static void addCallback(final ListenableFuture<?> future, final CountDownLatch futureNotifiedLatch,
130 final String expListenerThreadPrefix, final AtomicReference<AssertionError> assertError) {
132 Futures.addCallback(future, new FutureCallback<Object>() {
134 public void onSuccess(final Object result) {
136 final var theadName = Thread.currentThread().getName();
137 assertTrue(theadName.startsWith(expListenerThreadPrefix),
138 "ListenableFuture callback was not notified on the listener executor."
139 + " Expected thread name prefix \"" + expListenerThreadPrefix
140 + "\". Actual thread name \"" + theadName + "\"");
141 } catch (AssertionError e) {
144 futureNotifiedLatch.countDown();
149 @SuppressWarnings("checkstyle:parameterName")
150 public void onFailure(final Throwable cause) {
152 fail("Unexpected failure " + cause);
154 }, MoreExecutors.directExecutor());
158 void testDelegatedMethods() throws InterruptedException {
160 final var task = (Runnable) () -> { };
162 final var taskList = new ArrayList<Runnable>();
164 final var mockDelegate = mock(ExecutorService.class);
165 doNothing().when(mockDelegate).execute(task);
166 doNothing().when(mockDelegate).shutdown();
167 doReturn(taskList).when(mockDelegate).shutdownNow();
168 doReturn(Boolean.TRUE).when(mockDelegate).awaitTermination(3, TimeUnit.SECONDS);
169 doReturn(Boolean.TRUE).when(mockDelegate).isShutdown();
170 doReturn(Boolean.TRUE).when(mockDelegate).isTerminated();
172 final var executor = new AsyncNotifyingListeningExecutorService(
175 executor.execute(task);
177 assertTrue(executor.awaitTermination(3, TimeUnit.SECONDS), "awaitTermination");
178 assertSame(taskList, executor.shutdownNow(), "shutdownNow");
179 assertTrue(executor.isShutdown(), "isShutdown");
180 assertTrue(executor.isTerminated(), "isTerminated");
182 verify(mockDelegate).execute(task);
183 verify(mockDelegate).shutdown();
184 verify(mockDelegate).awaitTermination(3, TimeUnit.SECONDS);
185 verify(mockDelegate).shutdownNow();
186 verify(mockDelegate).isShutdown();
187 verify(mockDelegate).isTerminated();