Refactor DOMNotificationSubscriptionListener(Registry)
[mdsal.git] / dom / mdsal-dom-broker / src / test / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouterTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertSame;
15 import static org.junit.Assert.assertTrue;
16 import static org.mockito.ArgumentMatchers.any;
17 import static org.mockito.Mockito.doNothing;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import static org.opendaylight.mdsal.dom.broker.TestUtils.TEST_CHILD;
21
22 import com.google.common.util.concurrent.ListenableFuture;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.TimeUnit;
30 import org.junit.Test;
31 import org.opendaylight.mdsal.dom.api.DOMNotification;
32 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
33 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension;
34 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension.DemandListener;
35 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
36 import org.opendaylight.yangtools.yang.common.QName;
37 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
38
39 public class DOMNotificationRouterTest {
40     @Test
41     public void registerNotificationListener() {
42         try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
43             final var domNotificationListener = mock(DOMNotificationListener.class);
44
45             domNotificationRouter.notificationService().registerNotificationListener(domNotificationListener,
46                 List.of(Absolute.of(QName.create("urn:opendaylight:test-listener", "notif1"))));
47             assertEquals(1, domNotificationRouter.listeners().size());
48
49             domNotificationRouter.notificationService().registerNotificationListener(domNotificationListener,
50                 List.of(Absolute.of(QName.create("urn:opendaylight:test-listener", "notif2")),
51                     Absolute.of(QName.create("urn:opendaylight:test-listener", "notif3"))));
52             assertEquals(3, domNotificationRouter.listeners().size());
53         }
54     }
55
56     @Test
57     public void registerNotificationListeners() {
58         try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
59             final var domNotificationListener1 = mock(DOMNotificationListener.class);
60             final var domNotificationListener2 = mock(DOMNotificationListener.class);
61
62             domNotificationRouter.notificationService().registerNotificationListeners(
63                 Map.of(Absolute.of(QName.create("urn:opendaylight:test-listener", "notif1")), domNotificationListener1,
64                     Absolute.of(QName.create("urn:opendaylight:test-listener", "notif2")), domNotificationListener2));
65             assertEquals(2, domNotificationRouter.listeners().size());
66         }
67     }
68
69     @SuppressWarnings("checkstyle:IllegalCatch")
70     @Test
71     public void complexTest() throws Exception {
72         final var demandListener = mock(DemandListener.class);
73         doNothing().when(demandListener).onDemandUpdated(any());
74
75         final var latch = new CountDownLatch(1);
76         final var domNotificationListener = new TestListener(latch);
77         final var domNotificationRouter = new DOMNotificationRouter(1024);
78         final var notifService = domNotificationRouter.notificationService();
79         final var notifPubService = domNotificationRouter.notificationPublishService();
80         final var demandExt = notifPubService.extension(DOMNotificationPublishDemandExtension.class);
81         assertNotNull(demandExt);
82
83         var listeners = domNotificationRouter.listeners();
84
85         assertTrue(listeners.isEmpty());
86         assertNotNull(notifService.registerNotificationListener(domNotificationListener,
87             List.of(Absolute.of(TestModel.TEST_QNAME))));
88         assertNotNull(notifService.registerNotificationListener(domNotificationListener,
89             List.of(Absolute.of(TestModel.TEST2_QNAME))));
90
91         listeners = domNotificationRouter.listeners();
92
93         assertFalse(listeners.isEmpty());
94
95         assertEquals(0, domNotificationRouter.demandListeners().streamListeners().count());
96
97         assertNotNull(demandExt.registerDemandListener(demandListener));
98
99         assertSame(demandListener, domNotificationRouter.demandListeners().streamListeners().findAny().orElseThrow());
100
101         final var domNotification = mock(DOMNotification.class);
102         doReturn("test").when(domNotification).toString();
103         doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
104         doReturn(TEST_CHILD).when(domNotification).getBody();
105
106         assertNotNull(notifPubService.offerNotification(domNotification));
107
108         assertNotNull(notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
109         assertNotNull(notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
110         assertNotNull(notifPubService.putNotification(domNotification));
111     }
112
113     @Test
114     public void offerNotification() throws Exception {
115         try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
116             final var domNotification = mock(DOMNotification.class);
117             doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
118             doReturn(TEST_CHILD).when(domNotification).getBody();
119
120             final var notifPubService = domNotificationRouter.notificationPublishService();
121             assertNotNull(notifPubService.putNotification(domNotification));
122             assertNotNull(notifPubService.offerNotification(domNotification));
123             assertNotNull(notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
124         }
125     }
126
127     @Test
128     public void testOfferNotificationWithBlocking() throws Exception {
129         final var latch = new CountDownLatch(1);
130         final var testListener = new TestListener(latch);
131         final var domNotification = mock(DOMNotification.class);
132         doReturn("test").when(domNotification).toString();
133         doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
134         doReturn(TEST_CHILD).when(domNotification).getBody();
135
136         try (var testRouter = new TestRouter(1)) {
137             final var notifService = testRouter.notificationService();
138             final var notifPubService = testRouter.notificationPublishService();
139
140             assertNotNull(notifService.registerNotificationListener(testListener,
141                 List.of(Absolute.of(TestModel.TEST_QNAME))));
142             assertNotNull(notifService.registerNotificationListener(testListener,
143                 List.of(Absolute.of(TestModel.TEST2_QNAME))));
144
145             assertNotEquals(DOMNotificationPublishService.REJECTED,
146                 notifPubService.offerNotification(domNotification, 3, TimeUnit.SECONDS));
147             assertTrue("Listener was not notified", latch.await(5, TimeUnit.SECONDS));
148             assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
149
150             assertEquals(DOMNotificationPublishService.REJECTED,
151                 notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
152             assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
153         }
154     }
155
156     @Test
157     public void close() throws Exception {
158         final ExecutorService executor;
159         final ExecutorService observer;
160
161         try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
162             executor = domNotificationRouter.executor();
163             observer = domNotificationRouter.observer();
164             assertFalse(executor.isShutdown());
165             assertFalse(observer.isShutdown());
166         }
167         assertTrue(executor.isShutdown());
168         assertTrue(observer.isShutdown());
169     }
170
171     private static class TestListener implements DOMNotificationListener {
172         private final List<DOMNotification>  receivedNotifications = new ArrayList<>();
173         private final CountDownLatch latch;
174
175         TestListener(final CountDownLatch latch) {
176             this.latch = latch;
177         }
178
179         @Override
180         public void onNotification(final DOMNotification notification) {
181             receivedNotifications.add(notification);
182             latch.countDown();
183         }
184
185         List<DOMNotification> getReceivedNotifications() {
186             return receivedNotifications;
187         }
188     }
189
190     private static class TestRouter extends DOMNotificationRouter {
191
192         private boolean triggerRejected = false;
193
194         TestRouter(final int queueDepth) {
195             super(queueDepth);
196         }
197
198         @Override
199         ListenableFuture<? extends Object> publish(final DOMNotification notification,
200                 final Collection<Reg<?>> subscribers) {
201             if (triggerRejected) {
202                 return DOMNotificationPublishService.REJECTED;
203             }
204
205             triggerRejected = true;
206             return super.publish(notification, subscribers);
207         }
208
209         @Override
210         public ListenableFuture<? extends Object> putNotificationImpl(final DOMNotification notification)
211                 throws InterruptedException {
212             Thread.sleep(2000);
213             return super.putNotificationImpl(notification);
214         }
215     }
216 }