Fix eclipse/checkstyle warnings
[yangtools.git] / common / util / src / test / java / org / opendaylight / yangtools / util / concurrent / QueuedNotificationManagerTest.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.yangtools.util.concurrent;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertTrue;
13 import static org.junit.Assert.fail;
14 import static org.mockito.Mockito.mock;
15
16 import com.google.common.base.Stopwatch;
17 import com.google.common.util.concurrent.Uninterruptibles;
18 import java.util.ArrayList;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Test;
31 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.BatchedInvoker;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Unit tests for QueuedNotificationManager.
37  *
38  * @author Thomas Pantelis
39  */
40 public class QueuedNotificationManagerTest {
41
42     static class TestListener<N> {
43
44         private final List<N> actual;
45         private volatile int expCount;
46         private volatile CountDownLatch latch;
47         volatile long sleepTime = 0;
48         volatile RuntimeException runtimeEx;
49         volatile Error jvmError;
50         boolean cacheNotifications = true;
51         String name;
52
53         TestListener(final int expCount, final int id) {
54             name = "TestListener " + id;
55             actual = Collections.synchronizedList(new ArrayList<>(expCount));
56             reset(expCount);
57         }
58
59         void reset(final int expCount) {
60             this.expCount = expCount;
61             latch = new CountDownLatch(expCount);
62             actual.clear();
63         }
64
65         void onNotification(final Collection<? extends N> data) {
66
67             try {
68                 if (sleepTime > 0) {
69                     Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
70                 }
71
72                 if (cacheNotifications) {
73                     actual.addAll(data);
74                 }
75
76                 RuntimeException localRuntimeEx = runtimeEx;
77                 if (localRuntimeEx != null) {
78                     runtimeEx = null;
79                     throw localRuntimeEx;
80                 }
81
82                 Error localJvmError = jvmError;
83                 if (localJvmError != null) {
84                     jvmError = null;
85                     throw localJvmError;
86                 }
87
88             } finally {
89                 data.forEach(action -> latch.countDown());
90             }
91         }
92
93         void verifyNotifications() {
94             boolean done = Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS);
95             if (!done) {
96                 long actualCount = latch.getCount();
97                 fail(name + ": Received " + (expCount - actualCount) + " notifications. Expected " + expCount);
98             }
99         }
100
101         void verifyNotifications(final List<N> expected) {
102             verifyNotifications();
103             assertEquals(name + ": Notifications", expected, actual);
104         }
105
106         // Implement bad hashCode/equals methods to verify it doesn't screw up the
107         // QueuedNotificationManager as it should use reference identity.
108         @Override
109         public int hashCode() {
110             return 1;
111         }
112
113         @Override
114         public boolean equals(final Object obj) {
115             TestListener<?> other = (TestListener<?>) obj;
116             return other != null;
117         }
118     }
119
120     static class TestListener2<N> extends TestListener<N> {
121         TestListener2(final int expCount, final int id) {
122             super(expCount, id);
123         }
124     }
125
126     static class TestListener3<N> extends TestListener<N> {
127         TestListener3(final int expCount, final int id) {
128             super(expCount, id);
129         }
130     }
131
132     static class TestNotifier<N> implements BatchedInvoker<TestListener<N>, N> {
133         @Override
134         public void invokeListener(final TestListener<N> listener, final Collection<? extends N> notifications) {
135             listener.onNotification(notifications);
136         }
137     }
138
139     private static final Logger LOG = LoggerFactory.getLogger(QueuedNotificationManagerTest.class);
140     private ExecutorService queueExecutor;
141
142     @After
143     public void tearDown() {
144         if (queueExecutor != null) {
145             queueExecutor.shutdownNow();
146         }
147     }
148
149     @Test(timeout = 10000)
150     public void testNotificationsWithSingleListener() {
151
152         queueExecutor = Executors.newFixedThreadPool(2);
153         NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(queueExecutor,
154                 new TestNotifier<>(), 10, "TestMgr");
155
156         int count = 100;
157
158         TestListener<Integer> listener = new TestListener<>(count, 1);
159         listener.sleepTime = 20;
160
161         manager.submitNotifications(listener, Arrays.asList(1, 2));
162         manager.submitNotification(listener, 3);
163         manager.submitNotifications(listener, Arrays.asList(4, 5));
164         manager.submitNotification(listener, 6);
165
166         manager.submitNotifications(null, Collections.emptyList());
167         manager.submitNotifications(listener, null);
168         manager.submitNotification(listener, null);
169
170         Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
171
172         listener.sleepTime = 0;
173
174         List<Integer> expNotifications = new ArrayList<>(count);
175         expNotifications.addAll(Arrays.asList(1, 2, 3, 4, 5, 6));
176         int initialCount = 6;
177         for (int i = 1; i <= count - initialCount; i++) {
178             Integer val = Integer.valueOf(initialCount + i);
179             expNotifications.add(val);
180             manager.submitNotification(listener, val);
181         }
182
183         listener.verifyNotifications(expNotifications);
184     }
185
186     @Test
187     public void testNotificationsWithMultipleListeners() throws InterruptedException {
188
189         int count = 10;
190         queueExecutor = Executors.newFixedThreadPool(count);
191         final ExecutorService stagingExecutor = Executors.newFixedThreadPool(count);
192         final NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(
193                 queueExecutor, new TestNotifier<>(), 5000, "TestMgr");
194
195         final int nNotifications = 100000;
196
197         LOG.info("Testing {} listeners with {} notifications each...", count, nNotifications);
198
199         final Integer[] notifications = new Integer[nNotifications];
200         for (int i = 1; i <= nNotifications; i++) {
201             notifications[i - 1] = Integer.valueOf(i);
202         }
203
204         Stopwatch stopWatch = Stopwatch.createStarted();
205
206         List<TestListener<Integer>> listeners = new ArrayList<>();
207         List<Thread> threads = new ArrayList<>();
208         for (int i = 1; i <= count; i++) {
209             final TestListener<Integer> listener =
210                     i == 2 ? new TestListener2<>(nNotifications, i) :
211                     i == 3 ? new TestListener3<>(nNotifications, i) :
212                             new TestListener<>(nNotifications, i);
213             listeners.add(listener);
214
215             final Thread t = new Thread(() -> {
216                 for (int j = 1; j <= nNotifications; j++) {
217                     final Integer n = notifications[j - 1];
218                     stagingExecutor.execute(() -> manager.submitNotification(listener, n));
219                 }
220             });
221
222             t.start();
223             threads.add(t);
224
225         }
226
227         try {
228             for (TestListener<Integer> listener: listeners) {
229                 listener.verifyNotifications();
230                 LOG.info("{} succeeded", listener.name);
231             }
232         } finally {
233             stagingExecutor.shutdownNow();
234         }
235
236         stopWatch.stop();
237
238         LOG.info("Elapsed time: {}", stopWatch);
239         LOG.info("Executor: {}", queueExecutor);
240
241         for (Thread t : threads) {
242             t.join();
243         }
244     }
245
246     @Test(timeout = 10000)
247     public void testNotificationsWithListenerRuntimeEx() {
248
249         queueExecutor = Executors.newFixedThreadPool(1);
250         NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(queueExecutor,
251             new TestNotifier<>(), 10, "TestMgr");
252
253         TestListener<Integer> listener = new TestListener<>(2, 1);
254         final RuntimeException mockedRuntimeException = new RuntimeException("mock");
255         listener.runtimeEx = mockedRuntimeException;
256
257         manager.submitNotification(listener, 1);
258         manager.submitNotification(listener, 2);
259
260         listener.verifyNotifications();
261         List<Runnable> tasks = queueExecutor.shutdownNow();
262         assertTrue(tasks.isEmpty());
263     }
264
265     @Test(timeout = 10000)
266     public void testNotificationsWithListenerJVMError() {
267
268         final CountDownLatch errorCaughtLatch = new CountDownLatch(1);
269         queueExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
270             @Override
271             @SuppressWarnings("checkstyle:illegalCatch")
272             public void execute(final Runnable command) {
273                 super.execute(() -> {
274                     try {
275                         command.run();
276                     } catch (Error e) {
277                         errorCaughtLatch.countDown();
278                     }
279                 });
280             }
281         };
282
283         NotificationManager<TestListener<Integer>, Integer> manager = QueuedNotificationManager.create(queueExecutor,
284                 new TestNotifier<>(), 10, "TestMgr");
285
286         TestListener<Integer> listener = new TestListener<>(2, 1);
287         listener.jvmError = mock(Error.class);
288
289         manager.submitNotification(listener, 1);
290
291         assertTrue("JVM Error caught", Uninterruptibles.awaitUninterruptibly(errorCaughtLatch, 5, TimeUnit.SECONDS));
292
293         manager.submitNotification(listener, 2);
294
295         listener.verifyNotifications();
296         List<Runnable> tasks = queueExecutor.shutdownNow();
297         assertTrue(tasks.isEmpty());
298     }
299 }