2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.yangtools.util.concurrent;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.mock;
15 import com.google.common.base.Stopwatch;
16 import com.google.common.util.concurrent.Uninterruptibles;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.Collections;
20 import java.util.List;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import org.junit.After;
28 import org.junit.Test;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Unit tests for QueuedNotificationManager.
35 * @author Thomas Pantelis
37 public class QueuedNotificationManagerTest {
39 static class TestListener<N> {
41 private final List<N> actual;
42 private volatile int expCount;
43 private volatile CountDownLatch latch;
44 volatile long sleepTime = 0;
45 volatile RuntimeException runtimeEx;
46 volatile Error jvmError;
47 boolean cacheNotifications = true;
50 TestListener(final int expCount, final int id) {
51 name = "TestListener " + id;
52 actual = Collections.synchronizedList(new ArrayList<>(expCount));
56 void reset(final int expCount) {
57 this.expCount = expCount;
58 latch = new CountDownLatch(expCount);
62 void onNotification(final N data) {
66 Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS);
69 if (cacheNotifications) {
73 RuntimeException localRuntimeEx = runtimeEx;
74 if (localRuntimeEx != null) {
79 Error localJvmError = jvmError;
80 if (localJvmError != null) {
90 void verifyNotifications() {
91 boolean done = Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
93 long actualCount = latch.getCount();
94 fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
98 void verifyNotifications(final List<N> expected) {
99 verifyNotifications();
100 assertEquals(name + ": Notifications", expected, actual);
103 // Implement bad hashCode/equals methods to verify it doesn't screw up the
104 // QueuedNotificationManager as it should use reference identity.
106 public int hashCode() {
111 public boolean equals(final Object obj) {
112 TestListener<?> other = (TestListener<?>) obj;
113 return other != null;
117 static class TestListener2<N> extends TestListener<N> {
118 TestListener2(final int expCount, final int id) {
123 static class TestListener3<N> extends TestListener<N> {
124 TestListener3(final int expCount, final int id) {
129 static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>, N> {
131 public void invokeListener(final TestListener<N> listener, final N notification) {
132 listener.onNotification(notification);
136 private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class);
137 private ExecutorService queueExecutor;
140 public void tearDown() {
141 if (queueExecutor != null) {
142 queueExecutor.shutdownNow();
147 public void testNotificationsWithSingleListener() {
149 queueExecutor = Executors.newFixedThreadPool( 2 );
150 NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
151 new TestNotifier<>(), 10, "TestMgr" );
153 int initialCount = 6;
154 int nNotifications = 100;
156 TestListener<Integer> listener = new TestListener<>(nNotifications, 1);
157 listener.sleepTime = 20;
159 manager.submitNotifications(listener, Arrays.asList(1, 2));
160 manager.submitNotification(listener, 3);
161 manager.submitNotifications(listener, Arrays.asList(4, 5));
162 manager.submitNotification(listener, 6);
164 manager.submitNotifications(null, Collections.emptyList());
165 manager.submitNotifications(listener, null);
166 manager.submitNotification(listener, null);
168 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
170 listener.sleepTime = 0;
172 List<Integer> expNotifications = new ArrayList<>(nNotifications);
173 expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
174 for (int i = 1; i <= nNotifications - initialCount; i++) {
175 Integer v = Integer.valueOf(initialCount + i);
176 expNotifications.add(v);
177 manager.submitNotification(listener, v);
180 listener.verifyNotifications( expNotifications );
184 public void testNotificationsWithMultipleListeners() throws InterruptedException {
187 queueExecutor = Executors.newFixedThreadPool(nListeners);
188 final ExecutorService stagingExecutor = Executors.newFixedThreadPool(nListeners);
189 final NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(
190 queueExecutor, new TestNotifier<>(), 5000, "TestMgr" );
192 final int nNotifications = 100000;
194 LOG.info("Testing {} listeners with {} notifications each...", nListeners, nNotifications);
196 final Integer[] notifications = new Integer[nNotifications];
197 for (int i = 1; i <= nNotifications; i++) {
198 notifications[i - 1] = Integer.valueOf(i);
201 Stopwatch stopWatch = Stopwatch.createStarted();
203 List<TestListener<Integer>> listeners = new ArrayList<>();
204 List<Thread> threads = new ArrayList<>();
205 for (int i = 1; i <= nListeners; i++) {
206 final TestListener<Integer> listener =
207 i == 2 ? new TestListener2<>(nNotifications, i) :
208 i == 3 ? new TestListener3<>(nNotifications, i) :
209 new TestListener<>(nNotifications, i);
210 listeners.add(listener);
212 final Thread t = new Thread(() -> {
213 for (int j = 1; j <= nNotifications; j++) {
214 final Integer n = notifications[j - 1];
215 stagingExecutor.execute(() -> manager.submitNotification(listener, n));
225 for (TestListener<Integer> listener: listeners) {
226 listener.verifyNotifications();
227 LOG.info("{} succeeded", listener.name);
230 stagingExecutor.shutdownNow();
235 LOG.info("Elapsed time: {}", stopWatch);
236 LOG.info("Executor: {}", queueExecutor);
238 for (Thread t : threads) {
244 public void testNotificationsWithListenerRuntimeEx() {
246 queueExecutor = Executors.newFixedThreadPool(1);
247 NotificationManager<TestListener<Integer>, Integer> manager =
248 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
252 TestListener<Integer> listener = new TestListener<>(2, 1);
253 listener.runtimeEx = mock(RuntimeException.class);
255 manager.submitNotification(listener, 1);
256 manager.submitNotification(listener, 2);
258 listener.verifyNotifications();
262 public void testNotificationsWithListenerJVMError() {
264 final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
265 queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
267 public void execute(final Runnable command) {
268 super.execute(() -> {
272 errorCaughtLatch.countDown();
278 NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
279 new TestNotifier<>(), 10, "TestMgr");
281 TestListener<Integer> listener = new TestListener<>(2, 1);
282 listener.jvmError = mock(Error.class);
284 manager.submitNotification(listener, 1);
286 assertEquals("JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
287 errorCaughtLatch, 5, TimeUnit.SECONDS));
289 manager.submitNotification(listener, 2);
291 listener.verifyNotifications();