2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static org.junit.Assert.assertSame;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.doNothing;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.verify;
17 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
18 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
19 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.After;
34 import org.junit.Test;
35 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
38 * Unit tests for AsyncNotifyingListeningExecutorService.
40 * @author Thomas Pantelis
42 public class AsyncNotifyingListeningExecutorServiceTest {
44 private ExecutorService listenerExecutor;
45 private AsyncNotifyingListeningExecutorService testExecutor;
48 public void tearDown() {
49 if (listenerExecutor != null) {
50 listenerExecutor.shutdownNow();
53 if (testExecutor != null) {
54 testExecutor.shutdownNow();
59 public void testListenerCallbackWithExecutor() throws InterruptedException {
61 String listenerThreadPrefix = "ListenerThread";
62 listenerExecutor = Executors.newFixedThreadPool(3,
63 new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix + "-%d").build());
65 testExecutor = new AsyncNotifyingListeningExecutorService(
66 Executors.newSingleThreadExecutor(
67 new ThreadFactoryBuilder().setNameFormat("SingleThread").build()),
70 testListenerCallback(testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix);
71 testListenerCallback(testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix);
72 testListenerCallback(testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
76 public void testListenerCallbackWithNoExecutor() throws InterruptedException {
78 String listenerThreadPrefix = "SingleThread";
79 testExecutor = new AsyncNotifyingListeningExecutorService(
80 Executors.newSingleThreadExecutor(
81 new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix).build()),
84 testListenerCallback(testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix);
85 testListenerCallback(testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix);
86 testListenerCallback(testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
89 static void testListenerCallback(final AsyncNotifyingListeningExecutorService executor,
90 final Invoker invoker, final String expListenerThreadPrefix) throws InterruptedException {
92 AtomicReference<AssertionError> assertError = new AtomicReference<>();
93 CountDownLatch futureNotifiedLatch = new CountDownLatch(1);
94 CountDownLatch blockTaskLatch = new CountDownLatch(1);
96 // The blockTaskLatch is used to block the task from completing until we've added
97 // our listener to the Future. Otherwise, if the task completes quickly and the Future is
98 // set to done before we've added our listener, the call to ListenableFuture#addListener
99 // will immediately notify synchronously on this thread as Futures#addCallback defaults to
100 // a same thread executor. This would erroneously fail the test.
102 ListenableFuture<?> future = invoker.invokeExecutor(executor, blockTaskLatch);
103 addCallback(future, futureNotifiedLatch, expListenerThreadPrefix, assertError);
105 // Now that we've added our listener, signal the latch to let the task complete.
107 blockTaskLatch.countDown();
109 assertTrue("ListenableFuture callback was not notified of onSuccess",
110 futureNotifiedLatch.await(5, TimeUnit.SECONDS));
112 if (assertError.get() != null) {
113 throw assertError.get();
116 // Add another listener - since the Future is already complete, we expect the listener to be
117 // notified inline on this thread when it's added.
119 futureNotifiedLatch = new CountDownLatch(1);
120 addCallback(future, futureNotifiedLatch, Thread.currentThread().getName(), assertError);
122 assertTrue("ListenableFuture callback was not notified of onSuccess",
123 futureNotifiedLatch.await(5, TimeUnit.SECONDS));
125 if (assertError.get() != null) {
126 throw assertError.get();
130 static void addCallback(final ListenableFuture<?> future, final CountDownLatch futureNotifiedLatch,
131 final String expListenerThreadPrefix, final AtomicReference<AssertionError> assertError) {
133 Futures.addCallback(future, new FutureCallback<Object>() {
135 public void onSuccess(final Object result) {
137 String theadName = Thread.currentThread().getName();
138 assertTrue("ListenableFuture callback was not notified on the listener executor."
139 + " Expected thread name prefix \"" + expListenerThreadPrefix
140 + "\". Actual thread name \"" + theadName + "\"",
141 theadName.startsWith(expListenerThreadPrefix));
142 } catch (AssertionError e) {
145 futureNotifiedLatch.countDown();
150 @SuppressWarnings("checkstyle:parameterName")
151 public void onFailure(final Throwable t) {
153 fail("Unexpected failure " + t);
155 }, MoreExecutors.directExecutor());
159 public void testDelegatedMethods() throws InterruptedException {
161 Runnable task = () -> { };
163 List<Runnable> taskList = new ArrayList<>();
165 ExecutorService mockDelegate = mock(ExecutorService.class);
166 doNothing().when(mockDelegate).execute(task);
167 doNothing().when(mockDelegate).shutdown();
168 doReturn(taskList).when(mockDelegate).shutdownNow();
169 doReturn(Boolean.TRUE).when(mockDelegate).awaitTermination(3, TimeUnit.SECONDS);
170 doReturn(Boolean.TRUE).when(mockDelegate).isShutdown();
171 doReturn(Boolean.TRUE).when(mockDelegate).isTerminated();
173 AsyncNotifyingListeningExecutorService executor = new AsyncNotifyingListeningExecutorService(
176 executor.execute(task);
178 assertTrue("awaitTermination", executor.awaitTermination(3, TimeUnit.SECONDS));
179 assertSame("shutdownNow", taskList, executor.shutdownNow());
180 assertTrue("isShutdown", executor.isShutdown());
181 assertTrue("isTerminated", executor.isTerminated());
183 verify(mockDelegate).execute(task);
184 verify(mockDelegate).shutdown();
185 verify(mockDelegate).awaitTermination(3, TimeUnit.SECONDS);
186 verify(mockDelegate).shutdownNow();
187 verify(mockDelegate).isShutdown();
188 verify(mockDelegate).isTerminated();