f3275802c5869f0270507a936b473bf9e7477a87
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManagerTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Mockito.mock;
14
15 import com.google.common.base.Stopwatch;
16 import com.google.common.util.concurrent.Uninterruptibles;
17 import java.util.ArrayList;
18 import java.util.Arrays;
19 import java.util.Collections;
20 import java.util.List;
21 import java.util.concurrent.CountDownLatch;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import org.junit.After;
28 import org.junit.Test;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Unit tests for QueuedNotificationManager.
34  *
35  * @author Thomas Pantelis
36  */
37 public class QueuedNotificationManagerTest {
38
39     static class TestListener<N> {
40
41         private final List<N> actual;
42         private volatile int expCount;
43         private volatile CountDownLatch latch;
44         volatile long sleepTime = 0;
45         volatile RuntimeException runtimeEx;
46         volatile Error jvmError;
47         boolean cacheNotifications = true;
48         String name;
49
50         TestListener(final int expCount, final int id) {
51             name = "TestListener " + id;
52             actual = Collections.synchronizedList(new ArrayList<>(expCount));
53             reset(expCount);
54         }
55
56         void reset(final int expCount) {
57             this.expCount = expCount;
58             latch = new CountDownLatch(expCount);
59             actual.clear();
60         }
61
62         void onNotification(final N data) {
63
64             try {
65                 if (sleepTime > 0) {
66                     Uninterruptibles.sleepUninterruptibly( sleepTime, TimeUnit.MILLISECONDS);
67                 }
68
69                 if (cacheNotifications) {
70                     actual.add(data);
71                 }
72
73                 RuntimeException localRuntimeEx = runtimeEx;
74                 if (localRuntimeEx != null) {
75                     runtimeEx = null;
76                     throw localRuntimeEx;
77                 }
78
79                 Error localJvmError = jvmError;
80                 if (localJvmError != null) {
81                     jvmError = null;
82                     throw localJvmError;
83                 }
84
85             } finally {
86                 latch.countDown();
87             }
88         }
89
90         void verifyNotifications() {
91             boolean done = Uninterruptibles.awaitUninterruptibly(latch, 10, TimeUnit.SECONDS);
92             if (!done) {
93                 long actualCount = latch.getCount();
94                 fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
95             }
96         }
97
98         void verifyNotifications(final List<N> expected) {
99             verifyNotifications();
100             assertEquals(name + ": Notifications", expected, actual);
101         }
102
103         // Implement bad hashCode/equals methods to verify it doesn't screw up the
104         // QueuedNotificationManager as it should use reference identity.
105         @Override
106         public int hashCode() {
107             return 1;
108         }
109
110         @Override
111         public boolean equals(final Object obj) {
112             TestListener<?> other = (TestListener<?>) obj;
113             return other != null;
114         }
115     }
116
117     static class TestListener2<N> extends TestListener<N> {
118         TestListener2(final int expCount, final int id) {
119             super(expCount, id);
120         }
121     }
122
123     static class TestListener3<N> extends TestListener<N> {
124         TestListener3(final int expCount, final int id) {
125             super(expCount, id);
126         }
127     }
128
129     static class TestNotifier<N> implements QueuedNotificationManager.Invoker<TestListener<N>, N> {
130         @Override
131         public void invokeListener(final TestListener<N> listener, final N notification) {
132             listener.onNotification(notification);
133         }
134     }
135
136     private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class);
137     private ExecutorService queueExecutor;
138
139     @After
140     public void tearDown() {
141         if (queueExecutor != null) {
142             queueExecutor.shutdownNow();
143         }
144     }
145
146     @Test(timeout=10000)
147     public void testNotificationsWithSingleListener() {
148
149         queueExecutor = Executors.newFixedThreadPool( 2 );
150         NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
151                 new TestNotifier<>(), 10, "TestMgr" );
152
153         int initialCount = 6;
154         int nNotifications = 100;
155
156         TestListener<Integer> listener = new TestListener<>(nNotifications, 1);
157         listener.sleepTime = 20;
158
159         manager.submitNotifications(listener, Arrays.asList(1, 2));
160         manager.submitNotification(listener, 3);
161         manager.submitNotifications(listener, Arrays.asList(4, 5));
162         manager.submitNotification(listener, 6);
163
164         manager.submitNotifications(null, Collections.emptyList());
165         manager.submitNotifications(listener, null);
166         manager.submitNotification(listener, null);
167
168         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
169
170         listener.sleepTime = 0;
171
172         List<Integer> expNotifications = new ArrayList<>(nNotifications);
173         expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
174         for (int i = 1; i <= nNotifications - initialCount; i++) {
175             Integer v = Integer.valueOf(initialCount + i);
176             expNotifications.add(v);
177             manager.submitNotification(listener, v);
178         }
179
180         listener.verifyNotifications( expNotifications );
181     }
182
183     @Test
184     public void testNotificationsWithMultipleListeners() throws InterruptedException {
185
186         int nListeners = 10;
187         queueExecutor = Executors.newFixedThreadPool(nListeners);
188         final ExecutorService stagingExecutor = Executors.newFixedThreadPool(nListeners);
189         final NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(
190                 queueExecutor, new TestNotifier<>(), 5000, "TestMgr" );
191
192         final int nNotifications = 100000;
193
194         LOG.info("Testing {} listeners with {} notifications each...",  nListeners, nNotifications);
195
196         final Integer[] notifications = new Integer[nNotifications];
197         for (int i = 1; i <= nNotifications; i++) {
198             notifications[i - 1] = Integer.valueOf(i);
199         }
200
201         Stopwatch stopWatch = Stopwatch.createStarted();
202
203         List<TestListener<Integer>> listeners = new ArrayList<>();
204         List<Thread> threads = new ArrayList<>();
205         for (int i = 1; i <= nListeners; i++) {
206             final TestListener<Integer> listener =
207                     i == 2 ? new TestListener2<>(nNotifications, i) :
208                     i == 3 ? new TestListener3<>(nNotifications, i) :
209                             new TestListener<>(nNotifications, i);
210             listeners.add(listener);
211
212             final Thread t = new Thread(() -> {
213                 for (int j = 1; j <= nNotifications; j++) {
214                     final Integer n = notifications[j - 1];
215                     stagingExecutor.execute(() -> manager.submitNotification(listener, n));
216                 }
217             });
218
219             t.start();
220             threads.add(t);
221
222         }
223
224         try {
225             for (TestListener<Integer> listener: listeners) {
226                 listener.verifyNotifications();
227                 LOG.info("{} succeeded", listener.name);
228             }
229         } finally {
230             stagingExecutor.shutdownNow();
231         }
232
233         stopWatch.stop();
234
235         LOG.info("Elapsed time: {}", stopWatch);
236         LOG.info("Executor: {}", queueExecutor);
237
238         for (Thread t : threads) {
239             t.join();
240         }
241     }
242
243     @Test(timeout=10000)
244     public void testNotificationsWithListenerRuntimeEx() {
245
246         queueExecutor = Executors.newFixedThreadPool(1);
247         NotificationManager<TestListener<Integer>, Integer> manager =
248                 new QueuedNotificationManager<>( queueExecutor, new TestNotifier<>(),
249                 10, "TestMgr" );
250
251
252         TestListener<Integer> listener = new TestListener<>(2, 1);
253         listener.runtimeEx = mock(RuntimeException.class);
254
255         manager.submitNotification(listener, 1);
256         manager.submitNotification(listener, 2);
257
258         listener.verifyNotifications();
259     }
260
261     @Test(timeout=10000)
262     public void testNotificationsWithListenerJVMError() {
263
264         final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
265         queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
266              @Override
267              public void execute(final Runnable command) {
268                  super.execute(() -> {
269                      try {
270                          command.run();
271                      } catch (Error e) {
272                          errorCaughtLatch.countDown();
273                      }
274                  });
275              }
276         };
277
278         NotificationManager<TestListener<Integer>, Integer> manager = new QueuedNotificationManager<>(queueExecutor,
279                 new TestNotifier<>(), 10, "TestMgr");
280
281         TestListener<Integer> listener = new TestListener<>(2, 1);
282         listener.jvmError = mock(Error.class);
283
284         manager.submitNotification(listener, 1);
285
286         assertEquals("JVM Error caught", true, Uninterruptibles.awaitUninterruptibly(
287                                                        errorCaughtLatch, 5, TimeUnit.SECONDS));
288
289         manager.submitNotification(listener, 2);
290
291         listener.verifyNotifications();
292     }
293 }