2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.yangtools.util.concurrent;
10 import static org.junit.jupiter.api.Assertions.assertEquals;
11 import static org.junit.jupiter.api.Assertions.assertTrue;
12 import static org.junit.jupiter.api.Assertions.fail;
13 import static org.mockito.Mockito.mock;
15 import com.google.common.base.Stopwatch;
16 import com.google.common.collect.ImmutableList;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.jupiter.api.AfterEach;
30 import org.junit.jupiter.api.Test;
31 import org.junit.jupiter.api.Timeout;
32 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Unit tests for QueuedNotificationManager.
39 * @author Thomas Pantelis
41 class QueuedNotificationManagerTest {
43 static class TestListener<N> {
45 private final List<N> actual;
46 private volatile int expCount;
47 private volatile CountDownLatch latch;
48 volatile long sleepTime = 0;
49 volatile RuntimeException runtimeEx;
50 volatile Error jvmError;
51 boolean cacheNotifications = true;
54 TestListener(final int expCount, final int id) {
55 name = "TestListener " + id;
56 actual = Collections.synchronizedList(new ArrayList<>(expCount));
60 void reset(final int newExpCount) {
61 expCount = newExpCount;
62 latch = new CountDownLatch(newExpCount);
66 void onNotification(final Collection<? extends N> data) {
70 Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
73 if (cacheNotifications) {
77 final var localRuntimeEx = runtimeEx;
78 if (localRuntimeEx != null) {
83 final var localJvmError = jvmError;
84 if (localJvmError != null) {
90 data.forEach(action -> latch.countDown());
94 void verifyNotifications() {
95 boolean done = Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS);
97 long actualCount = latch.getCount();
98 fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
102 void verifyNotifications(final List<N> expected) {
103 verifyNotifications();
104 assertEquals(expected, actual, name + ": Notifications");
107 // Implement bad hashCode/equals methods to verify it doesn't screw up the
108 // QueuedNotificationManager as it should use reference identity.
110 public int hashCode() {
115 public boolean equals(final Object obj) {
116 final var other = (TestListener<?>) obj;
117 return other != null;
121 static class TestListener2<N> extends TestListener<N> {
122 TestListener2(final int expCount, final int id) {
127 static class TestListener3<N> extends TestListener<N> {
128 TestListener3(final int expCount, final int id) {
133 static class TestNotifier<N> implements BatchedInvoker<TestListener<N>, N> {
135 public void invokeListener(final TestListener<N> listener, final ImmutableList<N> notifications) {
136 listener.onNotification(notifications);
140 private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class);
141 private ExecutorService queueExecutor;
145 if (queueExecutor != null) {
146 queueExecutor.shutdownNow();
152 void testNotificationsWithSingleListener() {
154 queueExecutor = Executors.newFixedThreadPool(2);
155 final NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(
156 queueExecutor, new TestNotifier<>(), 10, "TestMgr");
158 final var count = 100;
160 final var listener = new TestListener<Integer>(count, 1);
161 listener.sleepTime = 20;
163 manager.submitNotifications(listener, Arrays.asList(1, 2));
164 manager.submitNotification(listener, 3);
165 manager.submitNotifications(listener, Arrays.asList(4, 5));
166 manager.submitNotification(listener, 6);
168 manager.submitNotifications(null, Collections.emptyList());
169 manager.submitNotifications(listener, null);
170 manager.submitNotification(listener, null);
172 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
174 listener.sleepTime = 0;
176 final var expNotifications = new ArrayList<Integer>(count);
177 expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
178 final var initialCount = 6;
179 for (int i = 1; i <= count - initialCount; i++) {
180 Integer val = initialCount + i;
181 expNotifications.add(val);
182 manager.submitNotification(listener, val);
185 listener.verifyNotifications(expNotifications);
189 void testNotificationsWithMultipleListeners() throws InterruptedException {
191 final var count = 10;
192 queueExecutor = Executors.newFixedThreadPool(count);
193 final var stagingExecutor = Executors.newFixedThreadPool(count);
194 final NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(
195 queueExecutor, new TestNotifier<>(), 5000, "TestMgr");
197 final var nNotifications = 100000;
199 LOG.info("Testing {} listeners with {} notifications each...", count, nNotifications);
201 final var notifications = new Integer[nNotifications];
202 for (int i = 1; i <= nNotifications; i++) {
203 notifications[i - 1] = i;
206 Stopwatch stopWatch = Stopwatch.createStarted();
208 final var listeners = new ArrayList<TestListener<Integer>>();
209 final var threads = new ArrayList<Thread>();
210 for (int i = 1; i <= count; i++) {
212 i == 2 ? new TestListener2<Integer>(nNotifications, i) :
213 i == 3 ? new TestListener3<Integer>(nNotifications, i) :
214 new TestListener<Integer>(nNotifications, i);
215 listeners.add(listener);
217 final var t = new Thread(() -> {
218 for (int j = 1; j <= nNotifications; j++) {
219 final Integer n = notifications[j - 1];
220 stagingExecutor.execute(() -> manager.submitNotification(listener, n));
230 for (final var listener: listeners) {
231 listener.verifyNotifications();
232 LOG.info("{} succeeded", listener.name);
235 stagingExecutor.shutdownNow();
240 LOG.info("Elapsed time: {}", stopWatch);
241 LOG.info("Executor: {}", queueExecutor);
243 for (Thread t : threads) {
250 void testNotificationsWithListenerRuntimeEx() {
252 queueExecutor = Executors.newFixedThreadPool(1);
253 final NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(
254 queueExecutor, new TestNotifier<>(), 10, "TestMgr");
256 final var listener = new TestListener<Integer>(2, 1);
257 final var mockedRuntimeException = new RuntimeException("mock");
258 listener.runtimeEx = mockedRuntimeException;
260 manager.submitNotification(listener, 1);
261 manager.submitNotification(listener, 2);
263 listener.verifyNotifications();
264 final var tasks = queueExecutor.shutdownNow();
265 assertTrue(tasks.isEmpty());
270 void testNotificationsWithListenerJVMError() {
272 final var errorCaughtLatch = new CountDownLatch(1);
273 queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
275 @SuppressWarnings("checkstyle:illegalCatch")
276 public void execute(final Runnable command) {
277 super.execute(() -> {
281 errorCaughtLatch.countDown();
287 final NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(
288 queueExecutor, new TestNotifier<>(), 10, "TestMgr");
290 final var listener = new TestListener<Integer>(2, 1);
291 listener.jvmError = mock(Error.class);
293 manager.submitNotification(listener, 1);
295 assertTrue(Uninterruptibles.awaitUninterruptibly(errorCaughtLatch, 5, TimeUnit.SECONDS), "JVM Error caught");
297 manager.submitNotification(listener, 2);
299 listener.verifyNotifications();
300 final var tasks = queueExecutor.shutdownNow();
301 assertTrue(tasks.isEmpty());