2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.mock;
16 import com.google.common.base.Stopwatch;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Test;
31 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * Unit tests for QueuedNotificationManager.
38 * @author Thomas Pantelis
40 public class QueuedNotificationManagerTest {
42 static class TestListener<N> {
44 private final List<N> actual;
45 private volatile int expCount;
46 private volatile CountDownLatch latch;
47 volatile long sleepTime = 0;
48 volatile RuntimeException runtimeEx;
49 volatile Error jvmError;
50 boolean cacheNotifications = true;
53 TestListener(final int expCount, final int id) {
54 name = "TestListener " + id;
55 actual = Collections.synchronizedList(new ArrayList<>(expCount));
59 void reset(final int newExpCount) {
60 this.expCount = newExpCount;
61 latch = new CountDownLatch(newExpCount);
65 void onNotification(final Collection<? extends N> data) {
69 Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
72 if (cacheNotifications) {
76 RuntimeException localRuntimeEx = runtimeEx;
77 if (localRuntimeEx != null) {
82 Error localJvmError = jvmError;
83 if (localJvmError != null) {
89 data.forEach(action -> latch.countDown());
93 void verifyNotifications() {
94 boolean done = Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS);
96 long actualCount = latch.getCount();
97 fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
101 void verifyNotifications(final List<N> expected) {
102 verifyNotifications();
103 assertEquals(name + ": Notifications", expected, actual);
106 // Implement bad hashCode/equals methods to verify it doesn't screw up the
107 // QueuedNotificationManager as it should use reference identity.
109 public int hashCode() {
114 public boolean equals(final Object obj) {
115 TestListener<?> other = (TestListener<?>) obj;
116 return other != null;
120 static class TestListener2<N> extends TestListener<N> {
121 TestListener2(final int expCount, final int id) {
126 static class TestListener3<N> extends TestListener<N> {
127 TestListener3(final int expCount, final int id) {
132 static class TestNotifier<N> implements BatchedInvoker<TestListener<N>, N> {
134 public void invokeListener(final TestListener<N> listener, final Collection<? extends N> notifications) {
135 listener.onNotification(notifications);
139 private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class);
140 private ExecutorService queueExecutor;
143 public void tearDown() {
144 if (queueExecutor != null) {
145 queueExecutor.shutdownNow();
149 @Test(timeout = 10000)
150 public void testNotificationsWithSingleListener() {
152 queueExecutor = Executors.newFixedThreadPool(2);
153 NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(queueExecutor,
154 new TestNotifier<>(), 10, "TestMgr");
158 TestListener<Integer> listener = new TestListener<>(count, 1);
159 listener.sleepTime = 20;
161 manager.submitNotifications(listener, Arrays.asList(1, 2));
162 manager.submitNotification(listener, 3);
163 manager.submitNotifications(listener, Arrays.asList(4, 5));
164 manager.submitNotification(listener, 6);
166 manager.submitNotifications(null, Collections.emptyList());
167 manager.submitNotifications(listener, null);
168 manager.submitNotification(listener, null);
170 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
172 listener.sleepTime = 0;
174 List<Integer> expNotifications = new ArrayList<>(count);
175 expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
176 int initialCount = 6;
177 for (int i = 1; i <= count - initialCount; i++) {
178 Integer val = Integer.valueOf(initialCount + i);
179 expNotifications.add(val);
180 manager.submitNotification(listener, val);
183 listener.verifyNotifications(expNotifications);
187 public void testNotificationsWithMultipleListeners() throws InterruptedException {
190 queueExecutor = Executors.newFixedThreadPool(count);
191 final ExecutorService stagingExecutor = Executors.newFixedThreadPool(count);
192 final NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(
193 queueExecutor, new TestNotifier<>(), 5000, "TestMgr");
195 final int nNotifications = 100000;
197 LOG.info("Testing {} listeners with {} notifications each...", count, nNotifications);
199 final Integer[] notifications = new Integer[nNotifications];
200 for (int i = 1; i <= nNotifications; i++) {
201 notifications[i - 1] = Integer.valueOf(i);
204 Stopwatch stopWatch = Stopwatch.createStarted();
206 List<TestListener<Integer>> listeners = new ArrayList<>();
207 List<Thread> threads = new ArrayList<>();
208 for (int i = 1; i <= count; i++) {
209 final TestListener<Integer> listener =
210 i == 2 ? new TestListener2<>(nNotifications, i) :
211 i == 3 ? new TestListener3<>(nNotifications, i) :
212 new TestListener<>(nNotifications, i);
213 listeners.add(listener);
215 final Thread t = new Thread(() -> {
216 for (int j = 1; j <= nNotifications; j++) {
217 final Integer n = notifications[j - 1];
218 stagingExecutor.execute(() -> manager.submitNotification(listener, n));
228 for (TestListener<Integer> listener: listeners) {
229 listener.verifyNotifications();
230 LOG.info("{} succeeded", listener.name);
233 stagingExecutor.shutdownNow();
238 LOG.info("Elapsed time: {}", stopWatch);
239 LOG.info("Executor: {}", queueExecutor);
241 for (Thread t : threads) {
246 @Test(timeout = 10000)
247 public void testNotificationsWithListenerRuntimeEx() {
249 queueExecutor = Executors.newFixedThreadPool(1);
250 NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(queueExecutor,
251 new TestNotifier<>(), 10, "TestMgr");
253 TestListener<Integer> listener = new TestListener<>(2, 1);
254 final RuntimeException mockedRuntimeException = new RuntimeException("mock");
255 listener.runtimeEx = mockedRuntimeException;
257 manager.submitNotification(listener, 1);
258 manager.submitNotification(listener, 2);
260 listener.verifyNotifications();
261 List<Runnable> tasks = queueExecutor.shutdownNow();
262 assertTrue(tasks.isEmpty());
265 @Test(timeout = 10000)
266 public void testNotificationsWithListenerJVMError() {
268 final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
269 queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
271 @SuppressWarnings("checkstyle:illegalCatch")
272 public void execute(final Runnable command) {
273 super.execute(() -> {
277 errorCaughtLatch.countDown();
283 NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(queueExecutor,
284 new TestNotifier<>(), 10, "TestMgr");
286 TestListener<Integer> listener = new TestListener<>(2, 1);
287 listener.jvmError = mock(Error.class);
289 manager.submitNotification(listener, 1);
291 assertTrue("JVM Error caught", Uninterruptibles.awaitUninterruptibly(errorCaughtLatch, 5, TimeUnit.SECONDS));
293 manager.submitNotification(listener, 2);
295 listener.verifyNotifications();
296 List<Runnable> tasks = queueExecutor.shutdownNow();
297 assertTrue(tasks.isEmpty());