69fa995125bb64b9f323a6548499dbe768be076e
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManagerTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.mock;
16
17 import com.google.common.base.Stopwatch;
18 import com.google.common.util.concurrent.Uninterruptibles;
19 import java.io.PrintStream;
20 import java.util.ArrayList;
21 import java.util.Arrays;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.After;
31 import org.junit.Test;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Unit tests for QueuedNotificationManager.
37  *
38  * @author Thomas Pantelis
39  */
40 public class QueuedNotificationManagerTest {
41
42     static class TestListener<N> {
43
44         private final List<N> actual;
45         private volatile int expCount;
46         private volatile CountDownLatch latch;
47         volatile long sleepTime = 0;
48         volatile RuntimeException runtimeEx;
49         volatile Error jvmError;
50         boolean cacheNotifications = true;
51         String name;
52
53         TestListener(final int expCount, final int id) {
54             name = "TestListener " + id;
55             actual = Collections.synchronizedList(new ArrayList<>(expCount));
56             reset(expCount);
57         }
58
59         void reset(final int expCount) {
60             this.expCount = expCount;
61             latch = new CountDownLatch(expCount);
62             actual.clear();
63         }
64
65         void onNotification(final N data) {
66
67             try {
68                 if (sleepTime > 0) {
69                     Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS);
70                 }
71
72                 if (cacheNotifications) {
73                     actual.add(data);
74                 }
75
76                 RuntimeException localRuntimeEx = runtimeEx;
77                 if (localRuntimeEx != null) {
78                     runtimeEx = null;
79                     throw localRuntimeEx;
80                 }
81
82                 Error localJvmError = jvmError;
83                 if (localJvmError != null) {
84                     jvmError = null;
85                     throw localJvmError;
86                 }
87
88             } finally {
89                 latch.countDown();
90             }
91         }
92
93         void verifyNotifications() {
94             boolean done = Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
95             if (!done) {
96                 long actualCount = latch.getCount();
97                 fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
98             }
99         }
100
101         void verifyNotifications(final List<N> expected) {
102             verifyNotifications();
103             assertEquals(name + ": Notifications", expected, actual);
104         }
105
106         // Implement bad hashCode/equals methods to verify it doesn't screw up the
107         // QueuedNotificationManager as it should use reference identity.
108         @Override
109         public int hashCode() {
110             return 1;
111         }
112
113         @Override
114         public boolean equals(final Object obj) {
115             TestListener<?> other = (TestListener<?>) obj;
116             return other != null;
117         }
118     }
119
120     static class TestListener2<N> extends TestListener<N> {
121         TestListener2(final int expCount, final int id) {
122             super(expCount, id);
123         }
124     }
125
126     static class TestListener3<N> extends TestListener<N> {
127         TestListener3(final int expCount, final int id) {
128             super(expCount, id);
129         }
130     }
131
132     static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>, N> {
133         @Override
134         public void invokeListener(final TestListener<N> listener, final N notification) {
135             listener.onNotification(notification);
136         }
137     }
138
139     private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class);
140     private ExecutorService queueExecutor;
141
142     @After
143     public void tearDown() {
144         if (queueExecutor != null) {
145             queueExecutor.shutdownNow();
146         }
147     }
148
149     @Test(timeout=10000)
150     public void testNotificationsWithSingleListener() {
151
152         queueExecutor = Executors.newFixedThreadPool( 2 );
153         NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
154                 new TestNotifier<>(), 10, "TestMgr" );
155
156         int initialCount = 6;
157         int nNotifications = 100;
158
159         TestListener<Integer> listener = new TestListener<>(nNotifications, 1);
160         listener.sleepTime = 20;
161
162         manager.submitNotifications(listener, Arrays.asList(1, 2));
163         manager.submitNotification(listener, 3);
164         manager.submitNotifications(listener, Arrays.asList(4, 5));
165         manager.submitNotification(listener, 6);
166
167         manager.submitNotifications(null, Collections.emptyList());
168         manager.submitNotifications(listener, null);
169         manager.submitNotification(listener, null);
170
171         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
172
173         listener.sleepTime = 0;
174
175         List<Integer> expNotifications = new ArrayList<>(nNotifications);
176         expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
177         for (int i = 1; i <= nNotifications - initialCount; i++) {
178             Integer v = Integer.valueOf(initialCount + i);
179             expNotifications.add(v);
180             manager.submitNotification(listener, v);
181         }
182
183         listener.verifyNotifications( expNotifications );
184     }
185
186     @Test
187     public void testNotificationsWithMultipleListeners() throws InterruptedException {
188
189         int nListeners = 10;
190         queueExecutor = Executors.newFixedThreadPool(nListeners);
191         final ExecutorService stagingExecutor = Executors.newFixedThreadPool(nListeners);
192         final NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(
193                 queueExecutor, new TestNotifier<>(), 5000, "TestMgr" );
194
195         final int nNotifications = 100000;
196
197         LOG.info("Testing {} listeners with {} notifications each...",  nListeners, nNotifications);
198
199         final Integer[] notifications = new Integer[nNotifications];
200         for (int i = 1; i <= nNotifications; i++) {
201             notifications[i - 1] = Integer.valueOf(i);
202         }
203
204         Stopwatch stopWatch = Stopwatch.createStarted();
205
206         List<TestListener<Integer>> listeners = new ArrayList<>();
207         List<Thread> threads = new ArrayList<>();
208         for (int i = 1; i <= nListeners; i++) {
209             final TestListener<Integer> listener =
210                     i == 2 ? new TestListener2<>(nNotifications, i) :
211                     i == 3 ? new TestListener3<>(nNotifications, i) :
212                             new TestListener<>(nNotifications, i);
213             listeners.add(listener);
214
215             final Thread t = new Thread(() -> {
216                 for (int j = 1; j <= nNotifications; j++) {
217                     final Integer n = notifications[j - 1];
218                     stagingExecutor.execute(() -> manager.submitNotification(listener, n));
219                 }
220             });
221
222             t.start();
223             threads.add(t);
224
225         }
226
227         try {
228             for (TestListener<Integer> listener: listeners) {
229                 listener.verifyNotifications();
230                 LOG.info("{} succeeded", listener.name);
231             }
232         } finally {
233             stagingExecutor.shutdownNow();
234         }
235
236         stopWatch.stop();
237
238         LOG.info("Elapsed time: {}", stopWatch);
239         LOG.info("Executor: {}", queueExecutor);
240
241         for (Thread t : threads) {
242             t.join();
243         }
244     }
245
246     @Test(timeout=10000)
247     public void testNotificationsWithListenerRuntimeEx() {
248
249         queueExecutor = Executors.newFixedThreadPool(1);
250         NotificationManager<TestListener<Integer>, Integer> manager =
251                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
252                 10, "TestMgr" );
253
254
255         TestListener<Integer> listener = new TestListener<>(2, 1);
256         final RuntimeException mockedRuntimeException = mock(RuntimeException.class);
257         doNothing().when(mockedRuntimeException).printStackTrace(any(PrintStream.class));
258         listener.runtimeEx = mockedRuntimeException;
259
260         manager.submitNotification(listener, 1);
261         manager.submitNotification(listener, 2);
262
263         listener.verifyNotifications();
264     }
265
266     @Test(timeout=10000)
267     public void testNotificationsWithListenerJVMError() {
268
269         final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
270         queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
271              @Override
272              public void execute(final Runnable command) {
273                  super.execute(() -> {
274                      try {
275                          command.run();
276                      } catch (Error e) {
277                          errorCaughtLatch.countDown();
278                      }
279                  });
280              }
281         };
282
283         NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
284                 new TestNotifier<>(), 10, "TestMgr");
285
286         TestListener<Integer> listener = new TestListener<>(2, 1);
287         listener.jvmError = mock(Error.class);
288
289         manager.submitNotification(listener, 1);
290
291         assertEquals("JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
292                                                        errorCaughtLatch, 5, TimeUnit.SECONDS));
293
294         manager.submitNotification(listener, 2);
295
296         listener.verifyNotifications();
297     }
298 }