Migrate common/util to JUnit5
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManagerTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.yangtools.util.concurrent;
9
10 import static org.junit.jupiter.api.Assertions.assertEquals;
11 import static org.junit.jupiter.api.Assertions.assertTrue;
12 import static org.junit.jupiter.api.Assertions.fail;
13 import static org.mockito.Mockito.mock;
14
15 import com.google.common.base.Stopwatch;
16 import com.google.common.collect.ImmutableList;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.jupiter.api.AfterEach;
30 import org.junit.jupiter.api.Test;
31 import org.junit.jupiter.api.Timeout;
32 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Unit tests for QueuedNotificationManager.
38  *
39  * @author Thomas Pantelis
40  */
41 class QueuedNotificationManagerTest {
42
43     static class TestListener<N> {
44
45         private final List<N> actual;
46         private volatile int expCount;
47         private volatile CountDownLatch latch;
48         volatile long sleepTime = 0;
49         volatile RuntimeException runtimeEx;
50         volatile Error jvmError;
51         boolean cacheNotifications = true;
52         String name;
53
54         TestListener(final int expCount, final int id) {
55             name = "TestListener " + id;
56             actual = Collections.synchronizedList(new ArrayList<>(expCount));
57             reset(expCount);
58         }
59
60         void reset(final int newExpCount) {
61             expCount = newExpCount;
62             latch = new CountDownLatch(newExpCount);
63             actual.clear();
64         }
65
66         void onNotification(final Collection<? extends N> data) {
67
68             try {
69                 if (sleepTime > 0) {
70                     Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
71                 }
72
73                 if (cacheNotifications) {
74                     actual.addAll(data);
75                 }
76
77                 final var localRuntimeEx = runtimeEx;
78                 if (localRuntimeEx != null) {
79                     runtimeEx = null;
80                     throw localRuntimeEx;
81                 }
82
83                 final var localJvmError = jvmError;
84                 if (localJvmError != null) {
85                     jvmError = null;
86                     throw localJvmError;
87                 }
88
89             } finally {
90                 data.forEach(action -> latch.countDown());
91             }
92         }
93
94         void verifyNotifications() {
95             boolean done = Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS);
96             if (!done) {
97                 long actualCount = latch.getCount();
98                 fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
99             }
100         }
101
102         void verifyNotifications(final List<N> expected) {
103             verifyNotifications();
104             assertEquals(expected, actual, name + ": Notifications");
105         }
106
107         // Implement bad hashCode/equals methods to verify it doesn't screw up the
108         // QueuedNotificationManager as it should use reference identity.
109         @Override
110         public int hashCode() {
111             return 1;
112         }
113
114         @Override
115         public boolean equals(final Object obj) {
116             final var other = (TestListener<?>) obj;
117             return other != null;
118         }
119     }
120
121     static class TestListener2<N> extends TestListener<N> {
122         TestListener2(final int expCount, final int id) {
123             super(expCount, id);
124         }
125     }
126
127     static class TestListener3<N> extends TestListener<N> {
128         TestListener3(final int expCount, final int id) {
129             super(expCount, id);
130         }
131     }
132
133     static class TestNotifier<N> implements BatchedInvoker<TestListener<N>, N> {
134         @Override
135         public void invokeListener(final TestListener<N> listener, final ImmutableList<N> notifications) {
136             listener.onNotification(notifications);
137         }
138     }
139
140     private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class);
141     private ExecutorService queueExecutor;
142
143     @AfterEach
144     void tearDown() {
145         if (queueExecutor != null) {
146             queueExecutor.shutdownNow();
147         }
148     }
149
150     @Test
151     @Timeout(10000)
152     void testNotificationsWithSingleListener() {
153
154         queueExecutor = Executors.newFixedThreadPool(2);
155         final NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(
156                 queueExecutor, new TestNotifier<>(), 10, "TestMgr");
157
158         final var count = 100;
159
160         final var listener = new TestListener<Integer>(count, 1);
161         listener.sleepTime = 20;
162
163         manager.submitNotifications(listener, Arrays.asList(1, 2));
164         manager.submitNotification(listener, 3);
165         manager.submitNotifications(listener, Arrays.asList(4, 5));
166         manager.submitNotification(listener, 6);
167
168         manager.submitNotifications(null, Collections.emptyList());
169         manager.submitNotifications(listener, null);
170         manager.submitNotification(listener, null);
171
172         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
173
174         listener.sleepTime = 0;
175
176         final var expNotifications = new ArrayList<Integer>(count);
177         expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
178         final var initialCount = 6;
179         for (int i = 1; i <= count - initialCount; i++) {
180             Integer val = initialCount + i;
181             expNotifications.add(val);
182             manager.submitNotification(listener, val);
183         }
184
185         listener.verifyNotifications(expNotifications);
186     }
187
188     @Test
189     void testNotificationsWithMultipleListeners() throws InterruptedException {
190
191         final var count = 10;
192         queueExecutor = Executors.newFixedThreadPool(count);
193         final var stagingExecutor = Executors.newFixedThreadPool(count);
194         final NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(
195                 queueExecutor, new TestNotifier<>(), 5000, "TestMgr");
196
197         final var nNotifications = 100000;
198
199         LOG.info("Testing {} listeners with {} notifications each...", count, nNotifications);
200
201         final var notifications = new Integer[nNotifications];
202         for (int i = 1; i <= nNotifications; i++) {
203             notifications[i - 1] = i;
204         }
205
206         Stopwatch stopWatch = Stopwatch.createStarted();
207
208         final var listeners = new ArrayList<TestListener<Integer>>();
209         final var threads = new ArrayList<Thread>();
210         for (int i = 1; i <= count; i++) {
211             final var listener =
212                     i == 2 ? new TestListener2<Integer>(nNotifications, i) :
213                     i == 3 ? new TestListener3<Integer>(nNotifications, i) :
214                             new TestListener<Integer>(nNotifications, i);
215             listeners.add(listener);
216
217             final var t = new Thread(() -> {
218                 for (int j = 1; j <= nNotifications; j++) {
219                     final Integer n = notifications[j - 1];
220                     stagingExecutor.execute(() -> manager.submitNotification(listener, n));
221                 }
222             });
223
224             t.start();
225             threads.add(t);
226
227         }
228
229         try {
230             for (final var listener: listeners) {
231                 listener.verifyNotifications();
232                 LOG.info("{} succeeded", listener.name);
233             }
234         } finally {
235             stagingExecutor.shutdownNow();
236         }
237
238         stopWatch.stop();
239
240         LOG.info("Elapsed time: {}", stopWatch);
241         LOG.info("Executor: {}", queueExecutor);
242
243         for (Thread t : threads) {
244             t.join();
245         }
246     }
247
248     @Test
249     @Timeout(10000)
250     void testNotificationsWithListenerRuntimeEx() {
251
252         queueExecutor = Executors.newFixedThreadPool(1);
253         final NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(
254                 queueExecutor, new TestNotifier<>(), 10, "TestMgr");
255
256         final var listener = new TestListener<Integer>(2, 1);
257         final var mockedRuntimeException = new RuntimeException("mock");
258         listener.runtimeEx = mockedRuntimeException;
259
260         manager.submitNotification(listener, 1);
261         manager.submitNotification(listener, 2);
262
263         listener.verifyNotifications();
264         final var tasks = queueExecutor.shutdownNow();
265         assertTrue(tasks.isEmpty());
266     }
267
268     @Test
269     @Timeout(10000)
270     void testNotificationsWithListenerJVMError() {
271
272         final var errorCaughtLatch = new CountDownLatch(1);
273         queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
274             @Override
275             @SuppressWarnings("checkstyle:illegalCatch")
276             public void execute(final Runnable command) {
277                 super.execute(() -> {
278                     try {
279                         command.run();
280                     } catch (Error e) {
281                         errorCaughtLatch.countDown();
282                     }
283                 });
284             }
285         };
286
287         final NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(
288                 queueExecutor, new TestNotifier<>(), 10, "TestMgr");
289
290         final var listener = new TestListener<Integer>(2, 1);
291         listener.jvmError = mock(Error.class);
292
293         manager.submitNotification(listener, 1);
294
295         assertTrue(Uninterruptibles.awaitUninterruptibly(errorCaughtLatch, 5, TimeUnit.SECONDS), "JVM Error caught");
296
297         manager.submitNotification(listener, 2);
298
299         listener.verifyNotifications();
300         final var tasks = queueExecutor.shutdownNow();
301         assertTrue(tasks.isEmpty());
302     }
303 }