2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.mock;
17 import com.google.common.base.Stopwatch;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import java.io.PrintStream;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.After;
31 import org.junit.Test;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * Unit tests for QueuedNotificationManager.
38 * @author Thomas Pantelis
40 public class QueuedNotificationManagerTest {
42 static class TestListener<N> {
44 private final List<N> actual;
45 private volatile int expCount;
46 private volatile CountDownLatch latch;
47 volatile long sleepTime = 0;
48 volatile RuntimeException runtimeEx;
49 volatile Error jvmError;
50 boolean cacheNotifications = true;
53 TestListener(final int expCount, final int id) {
54 name = "TestListener " + id;
55 actual = Collections.synchronizedList(new ArrayList<>(expCount));
59 void reset(final int expCount) {
60 this.expCount = expCount;
61 latch = new CountDownLatch(expCount);
65 void onNotification(final N data) {
69 Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS);
72 if (cacheNotifications) {
76 RuntimeException localRuntimeEx = runtimeEx;
77 if (localRuntimeEx != null) {
82 Error localJvmError = jvmError;
83 if (localJvmError != null) {
93 void verifyNotifications() {
94 boolean done = Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
96 long actualCount = latch.getCount();
97 fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
101 void verifyNotifications(final List<N> expected) {
102 verifyNotifications();
103 assertEquals(name + ": Notifications", expected, actual);
106 // Implement bad hashCode/equals methods to verify it doesn't screw up the
107 // QueuedNotificationManager as it should use reference identity.
109 public int hashCode() {
114 public boolean equals(final Object obj) {
115 TestListener<?> other = (TestListener<?>) obj;
116 return other != null;
120 static class TestListener2<N> extends TestListener<N> {
121 TestListener2(final int expCount, final int id) {
126 static class TestListener3<N> extends TestListener<N> {
127 TestListener3(final int expCount, final int id) {
132 static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>, N> {
134 public void invokeListener(final TestListener<N> listener, final N notification) {
135 listener.onNotification(notification);
139 private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class);
140 private ExecutorService queueExecutor;
143 public void tearDown() {
144 if (queueExecutor != null) {
145 queueExecutor.shutdownNow();
150 public void testNotificationsWithSingleListener() {
152 queueExecutor = Executors.newFixedThreadPool( 2 );
153 NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
154 new TestNotifier<>(), 10, "TestMgr" );
156 int initialCount = 6;
157 int nNotifications = 100;
159 TestListener<Integer> listener = new TestListener<>(nNotifications, 1);
160 listener.sleepTime = 20;
162 manager.submitNotifications(listener, Arrays.asList(1, 2));
163 manager.submitNotification(listener, 3);
164 manager.submitNotifications(listener, Arrays.asList(4, 5));
165 manager.submitNotification(listener, 6);
167 manager.submitNotifications(null, Collections.emptyList());
168 manager.submitNotifications(listener, null);
169 manager.submitNotification(listener, null);
171 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
173 listener.sleepTime = 0;
175 List<Integer> expNotifications = new ArrayList<>(nNotifications);
176 expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
177 for (int i = 1; i <= nNotifications - initialCount; i++) {
178 Integer v = Integer.valueOf(initialCount + i);
179 expNotifications.add(v);
180 manager.submitNotification(listener, v);
183 listener.verifyNotifications( expNotifications );
187 public void testNotificationsWithMultipleListeners() throws InterruptedException {
190 queueExecutor = Executors.newFixedThreadPool(nListeners);
191 final ExecutorService stagingExecutor = Executors.newFixedThreadPool(nListeners);
192 final NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(
193 queueExecutor, new TestNotifier<>(), 5000, "TestMgr" );
195 final int nNotifications = 100000;
197 LOG.info("Testing {} listeners with {} notifications each...", nListeners, nNotifications);
199 final Integer[] notifications = new Integer[nNotifications];
200 for (int i = 1; i <= nNotifications; i++) {
201 notifications[i - 1] = Integer.valueOf(i);
204 Stopwatch stopWatch = Stopwatch.createStarted();
206 List<TestListener<Integer>> listeners = new ArrayList<>();
207 List<Thread> threads = new ArrayList<>();
208 for (int i = 1; i <= nListeners; i++) {
209 final TestListener<Integer> listener =
210 i == 2 ? new TestListener2<>(nNotifications, i) :
211 i == 3 ? new TestListener3<>(nNotifications, i) :
212 new TestListener<>(nNotifications, i);
213 listeners.add(listener);
215 final Thread t = new Thread(() -> {
216 for (int j = 1; j <= nNotifications; j++) {
217 final Integer n = notifications[j - 1];
218 stagingExecutor.execute(() -> manager.submitNotification(listener, n));
228 for (TestListener<Integer> listener: listeners) {
229 listener.verifyNotifications();
230 LOG.info("{} succeeded", listener.name);
233 stagingExecutor.shutdownNow();
238 LOG.info("Elapsed time: {}", stopWatch);
239 LOG.info("Executor: {}", queueExecutor);
241 for (Thread t : threads) {
247 public void testNotificationsWithListenerRuntimeEx() {
249 queueExecutor = Executors.newFixedThreadPool(1);
250 NotificationManager<TestListener<Integer>, Integer> manager =
251 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
255 TestListener<Integer> listener = new TestListener<>(2, 1);
256 final RuntimeException mockedRuntimeException = mock(RuntimeException.class);
257 doNothing().when(mockedRuntimeException).printStackTrace(any(PrintStream.class));
258 listener.runtimeEx = mockedRuntimeException;
260 manager.submitNotification(listener, 1);
261 manager.submitNotification(listener, 2);
263 listener.verifyNotifications();
267 public void testNotificationsWithListenerJVMError() {
269 final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
270 queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
272 public void execute(final Runnable command) {
273 super.execute(() -> {
277 errorCaughtLatch.countDown();
283 NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
284 new TestNotifier<>(), 10, "TestMgr");
286 TestListener<Integer> listener = new TestListener<>(2, 1);
287 listener.jvmError = mock(Error.class);
289 manager.submitNotification(listener, 1);
291 assertEquals("JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
292 errorCaughtLatch, 5, TimeUnit.SECONDS));
294 manager.submitNotification(listener, 2);
296 listener.verifyNotifications();