Merge branch 'master' of ../controller
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / AsyncNotifyingListeningExecutorServiceTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static org.junit.Assert.assertSame;
11 import static org.junit.Assert.assertTrue;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.doNothing;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.verify;
17 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_CALLABLE;
18 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE;
19 import static org.opendaylight.yangtools.util.concurrent.CommonTestUtils.SUBMIT_RUNNABLE_WITH_RESULT;
20
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.ListenableFuture;
24 import com.google.common.util.concurrent.MoreExecutors;
25 import com.google.common.util.concurrent.ThreadFactoryBuilder;
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33 import org.junit.After;
34 import org.junit.Test;
35 import org.opendaylight.yangtools.util.concurrent.CommonTestUtils.Invoker;
36
37 /**
38  * Unit tests for AsyncNotifyingListeningExecutorService.
39  *
40  * @author Thomas Pantelis
41  */
42 public class AsyncNotifyingListeningExecutorServiceTest {
43
44     private ExecutorService listenerExecutor;
45     private AsyncNotifyingListeningExecutorService testExecutor;
46
47     @After
48     public void tearDown() {
49         if (listenerExecutor != null) {
50             listenerExecutor.shutdownNow();
51         }
52
53         if (testExecutor != null) {
54             testExecutor.shutdownNow();
55         }
56     }
57
58     @Test
59     public void testListenerCallbackWithExecutor() throws InterruptedException {
60
61         String listenerThreadPrefix = "ListenerThread";
62         listenerExecutor = Executors.newFixedThreadPool(3,
63                 new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix + "-%d").build());
64
65         testExecutor = new AsyncNotifyingListeningExecutorService(
66                 Executors.newSingleThreadExecutor(
67                         new ThreadFactoryBuilder().setNameFormat("SingleThread").build()),
68                 listenerExecutor);
69
70         testListenerCallback(testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix);
71         testListenerCallback(testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix);
72         testListenerCallback(testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
73     }
74
75     @Test
76     public void testListenerCallbackWithNoExecutor() throws InterruptedException {
77
78         String listenerThreadPrefix = "SingleThread";
79         testExecutor = new AsyncNotifyingListeningExecutorService(
80                 Executors.newSingleThreadExecutor(
81                         new ThreadFactoryBuilder().setNameFormat(listenerThreadPrefix).build()),
82                 null);
83
84         testListenerCallback(testExecutor, SUBMIT_CALLABLE, listenerThreadPrefix);
85         testListenerCallback(testExecutor, SUBMIT_RUNNABLE, listenerThreadPrefix);
86         testListenerCallback(testExecutor, SUBMIT_RUNNABLE_WITH_RESULT, listenerThreadPrefix);
87     }
88
89     static void testListenerCallback(final AsyncNotifyingListeningExecutorService executor,
90             final Invoker invoker, final String expListenerThreadPrefix) throws InterruptedException {
91
92         AtomicReference<AssertionError> assertError = new AtomicReference<>();
93         CountDownLatch futureNotifiedLatch = new CountDownLatch(1);
94         CountDownLatch blockTaskLatch = new CountDownLatch(1);
95
96         // The blockTaskLatch is used to block the task from completing until we've added
97         // our listener to the Future. Otherwise, if the task completes quickly and the Future is
98         // set to done before we've added our listener, the call to ListenableFuture#addListener
99         // will immediately notify synchronously on this thread as Futures#addCallback defaults to
100         // a same thread executor. This would erroneously fail the test.
101
102         ListenableFuture<?> future = invoker.invokeExecutor(executor, blockTaskLatch);
103         addCallback(future, futureNotifiedLatch, expListenerThreadPrefix, assertError);
104
105         // Now that we've added our listener, signal the latch to let the task complete.
106
107         blockTaskLatch.countDown();
108
109         assertTrue("ListenableFuture callback was not notified of onSuccess",
110                     futureNotifiedLatch.await(5, TimeUnit.SECONDS));
111
112         if (assertError.get() != null) {
113             throw assertError.get();
114         }
115
116         // Add another listener - since the Future is already complete, we expect the listener to be
117         // notified inline on this thread when it's added.
118
119         futureNotifiedLatch = new CountDownLatch(1);
120         addCallback(future, futureNotifiedLatch, Thread.currentThread().getName(), assertError);
121
122         assertTrue("ListenableFuture callback was not notified of onSuccess",
123                     futureNotifiedLatch.await(5, TimeUnit.SECONDS));
124
125         if (assertError.get() != null) {
126             throw assertError.get();
127         }
128     }
129
130     static void addCallback(final ListenableFuture<?> future, final CountDownLatch futureNotifiedLatch,
131             final String expListenerThreadPrefix, final AtomicReference<AssertionError> assertError) {
132
133         Futures.addCallback(future, new FutureCallback<Object>() {
134             @Override
135             public void onSuccess(final Object result) {
136                 try {
137                     String theadName = Thread.currentThread().getName();
138                     assertTrue("ListenableFuture callback was not notified on the listener executor."
139                         + " Expected thread name prefix \"" + expListenerThreadPrefix
140                         + "\". Actual thread name \"" + theadName + "\"",
141                             theadName.startsWith(expListenerThreadPrefix));
142                 } catch (AssertionError e) {
143                     assertError.set(e);
144                 } finally {
145                     futureNotifiedLatch.countDown();
146                 }
147             }
148
149             @Override
150             @SuppressWarnings("checkstyle:parameterName")
151             public void onFailure(final Throwable t) {
152                 // Shouldn't happen
153                 fail("Unexpected failure " + t);
154             }
155         }, MoreExecutors.directExecutor());
156     }
157
158     @Test
159     public void testDelegatedMethods() throws InterruptedException {
160
161         Runnable task = () -> { };
162
163         List<Runnable> taskList = new ArrayList<>();
164
165         ExecutorService mockDelegate = mock(ExecutorService.class);
166         doNothing().when(mockDelegate).execute(task);
167         doNothing().when(mockDelegate).shutdown();
168         doReturn(taskList).when(mockDelegate).shutdownNow();
169         doReturn(Boolean.TRUE).when(mockDelegate).awaitTermination(3, TimeUnit.SECONDS);
170         doReturn(Boolean.TRUE).when(mockDelegate).isShutdown();
171         doReturn(Boolean.TRUE).when(mockDelegate).isTerminated();
172
173         AsyncNotifyingListeningExecutorService executor = new AsyncNotifyingListeningExecutorService(
174                                                                    mockDelegate, null);
175
176         executor.execute(task);
177         executor.shutdown();
178         assertTrue("awaitTermination", executor.awaitTermination(3, TimeUnit.SECONDS));
179         assertSame("shutdownNow", taskList, executor.shutdownNow());
180         assertTrue("isShutdown", executor.isShutdown());
181         assertTrue("isTerminated", executor.isTerminated());
182
183         verify(mockDelegate).execute(task);
184         verify(mockDelegate).shutdown();
185         verify(mockDelegate).awaitTermination(3, TimeUnit.SECONDS);
186         verify(mockDelegate).shutdownNow();
187         verify(mockDelegate).isShutdown();
188         verify(mockDelegate).isTerminated();
189     }
190 }