Move AbstractDOMDataBroker to mdsal-dom-spi
[mdsal.git] / dom / mdsal-dom-broker / src / test / java / org / opendaylight / mdsal / dom / broker / DOMNotificationRouterTest.java
index e435d33c3f1c9814cb00945da1e3e3f7acb5f9d7..cdd54ce4af0d94bccb8e3a76c37e8c74a7e9fa4c 100644 (file)
@@ -11,175 +11,206 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.opendaylight.mdsal.dom.broker.TestUtils.TEST_CHILD;
 
-import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.lmax.disruptor.PhasedBackoffWaitStrategy;
-import com.lmax.disruptor.WaitStrategy;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-
 import org.junit.Test;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension;
+import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension.DemandListener;
 import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
-import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.ListenerRegistry;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-
-public class DOMNotificationRouterTest extends TestUtils {
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
 
-    private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(
-            1L, 30L, TimeUnit.MILLISECONDS);
+public class DOMNotificationRouterTest {
+    @Test
+    public void registerNotificationListener() {
+        try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
+            final var domNotificationListener = mock(DOMNotificationListener.class);
+
+            domNotificationRouter.notificationService().registerNotificationListener(domNotificationListener,
+                List.of(Absolute.of(QName.create("urn:opendaylight:test-listener", "notif1"))));
+            assertEquals(1, domNotificationRouter.listeners().size());
+
+            domNotificationRouter.notificationService().registerNotificationListener(domNotificationListener,
+                List.of(Absolute.of(QName.create("urn:opendaylight:test-listener", "notif2")),
+                    Absolute.of(QName.create("urn:opendaylight:test-listener", "notif3"))));
+            assertEquals(3, domNotificationRouter.listeners().size());
+        }
+    }
 
     @Test
-    public void create() throws Exception {
-        assertNotNull(DOMNotificationRouter.create(1,1,1,TimeUnit.SECONDS));
-        assertNotNull(DOMNotificationRouter.create(1));
+    public void registerNotificationListeners() {
+        try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
+            final var domNotificationListener1 = mock(DOMNotificationListener.class);
+            final var domNotificationListener2 = mock(DOMNotificationListener.class);
+
+            domNotificationRouter.notificationService().registerNotificationListeners(
+                Map.of(Absolute.of(QName.create("urn:opendaylight:test-listener", "notif1")), domNotificationListener1,
+                    Absolute.of(QName.create("urn:opendaylight:test-listener", "notif2")), domNotificationListener2));
+            assertEquals(2, domNotificationRouter.listeners().size());
+        }
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     @Test
     public void complexTest() throws Exception {
-        final DOMNotificationSubscriptionListener domNotificationSubscriptionListener =
-                mock(DOMNotificationSubscriptionListener.class);
-        final CountDownLatch latch = new CountDownLatch(1);
-        final DOMNotificationListener domNotificationListener = new TestListener(latch);
-        final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
+        final var demandListener = mock(DemandListener.class);
+        doNothing().when(demandListener).onDemandUpdated(any());
+
+        final var latch = new CountDownLatch(1);
+        final var domNotificationListener = new TestListener(latch);
+        final var domNotificationRouter = new DOMNotificationRouter(1024);
+        final var notifService = domNotificationRouter.notificationService();
+        final var notifPubService = domNotificationRouter.notificationPublishService();
+        final var demandExt = notifPubService.extension(DOMNotificationPublishDemandExtension.class);
+        assertNotNull(demandExt);
 
-        Multimap<SchemaPath, ?> listeners = domNotificationRouter.listeners();
+        var listeners = domNotificationRouter.listeners();
 
         assertTrue(listeners.isEmpty());
-        assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.ROOT));
-        assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.SAME));
+        assertNotNull(notifService.registerNotificationListener(domNotificationListener,
+            List.of(Absolute.of(TestModel.TEST_QNAME))));
+        assertNotNull(notifService.registerNotificationListener(domNotificationListener,
+            List.of(Absolute.of(TestModel.TEST2_QNAME))));
 
         listeners = domNotificationRouter.listeners();
 
         assertFalse(listeners.isEmpty());
 
-        ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
-                domNotificationRouter.subscriptionListeners();
+        assertEquals(0, domNotificationRouter.demandListeners().streamObjects().count());
 
-        assertFalse(subscriptionListeners.iterator().hasNext());
-        assertNotNull(domNotificationRouter.registerSubscriptionListener(domNotificationSubscriptionListener));
+        assertNotNull(demandExt.registerDemandListener(demandListener));
 
-        subscriptionListeners = domNotificationRouter.subscriptionListeners();
-        assertTrue(subscriptionListeners.iterator().hasNext());
+        assertSame(demandListener, domNotificationRouter.demandListeners().streamObjects().findAny().orElseThrow());
 
-        final DOMNotification domNotification = mock(DOMNotification.class);
+        final var domNotification = mock(DOMNotification.class);
         doReturn("test").when(domNotification).toString();
-        doReturn(SchemaPath.ROOT).when(domNotification).getType();
+        doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
         doReturn(TEST_CHILD).when(domNotification).getBody();
 
-        assertNotNull(domNotificationRouter.offerNotification(domNotification));
-
-        try {
-            assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
-            assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
-        } catch (Exception e) {
-            assertTrue(e instanceof UnsupportedOperationException);
-        }
+        assertNotNull(notifPubService.offerNotification(domNotification));
 
-        assertNotNull(domNotificationRouter.putNotification(domNotification));
+        assertNotNull(notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+        assertNotNull(notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+        assertNotNull(notifPubService.putNotification(domNotification));
     }
 
     @Test
     public void offerNotification() throws Exception {
-        final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
-        final DOMNotification domNotification = mock(DOMNotification.class);
-        doReturn(SchemaPath.ROOT).when(domNotification).getType();
-        doReturn(TEST_CHILD).when(domNotification).getBody();
-        assertNotNull(domNotificationRouter.putNotification(domNotification));
-        assertNotNull(domNotificationRouter.offerNotification(domNotification));
-        assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+        try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
+            final var domNotification = mock(DOMNotification.class);
+            doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
+            doReturn(TEST_CHILD).when(domNotification).getBody();
+
+            final var notifPubService = domNotificationRouter.notificationPublishService();
+            assertNotNull(notifPubService.putNotification(domNotification));
+            assertNotNull(notifPubService.offerNotification(domNotification));
+            assertNotNull(notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+        }
     }
 
     @Test
     public void testOfferNotificationWithBlocking() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
-        final TestListener testListener = new TestListener(latch);
-        final DOMNotification domNotification = mock(DOMNotification.class);
+        final var latch = new CountDownLatch(1);
+        final var testListener = new TestListener(latch);
+        final var domNotification = mock(DOMNotification.class);
         doReturn("test").when(domNotification).toString();
-        doReturn(SchemaPath.ROOT).when(domNotification).getType();
+        doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
         doReturn(TEST_CHILD).when(domNotification).getBody();
-        final TestRouter testRouter = TestRouter.create(1);
-        assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.ROOT));
-        assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.SAME));
 
-        assertNotEquals(DOMNotificationPublishService.REJECTED,
-                testRouter.offerNotification(domNotification, 3, TimeUnit.SECONDS));
-        assertTrue("Listener was not notified", latch.await(5, TimeUnit.SECONDS));
-        assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+        try (var testRouter = new TestRouter(1)) {
+            final var notifService = testRouter.notificationService();
+            final var notifPubService = testRouter.notificationPublishService();
 
-        assertEquals(DOMNotificationPublishService.REJECTED,
-                testRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
-        assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+            assertNotNull(notifService.registerNotificationListener(testListener,
+                List.of(Absolute.of(TestModel.TEST_QNAME))));
+            assertNotNull(notifService.registerNotificationListener(testListener,
+                List.of(Absolute.of(TestModel.TEST2_QNAME))));
 
+            assertNotEquals(DOMNotificationPublishService.REJECTED,
+                notifPubService.offerNotification(domNotification, 3, TimeUnit.SECONDS));
+            assertTrue("Listener was not notified", latch.await(5, TimeUnit.SECONDS));
+            assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+
+            assertEquals(DOMNotificationPublishService.REJECTED,
+                notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+            assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+        }
     }
 
     @Test
     public void close() throws Exception {
-        final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
-
-        final ExecutorService executor = domNotificationRouter.executor();
-
-        assertFalse(executor.isShutdown());
-        domNotificationRouter.close();
+        final ExecutorService executor;
+        final ExecutorService observer;
+
+        try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
+            executor = domNotificationRouter.executor();
+            observer = domNotificationRouter.observer();
+            assertFalse(executor.isShutdown());
+            assertFalse(observer.isShutdown());
+        }
         assertTrue(executor.isShutdown());
+        assertTrue(observer.isShutdown());
     }
 
     private static class TestListener implements DOMNotificationListener {
-        private final CountDownLatch latch;
         private final List<DOMNotification>  receivedNotifications = new ArrayList<>();
+        private final CountDownLatch latch;
 
         TestListener(final CountDownLatch latch) {
             this.latch = latch;
         }
 
         @Override
-        public void onNotification(@Nonnull final DOMNotification notification) {
+        public void onNotification(final DOMNotification notification) {
             receivedNotifications.add(notification);
             latch.countDown();
         }
 
-        public List<DOMNotification> getReceivedNotifications() {
+        List<DOMNotification> getReceivedNotifications() {
             return receivedNotifications;
         }
     }
 
     private static class TestRouter extends DOMNotificationRouter {
 
-        TestRouter(ExecutorService executor, int queueDepth, WaitStrategy strategy) {
-            super(executor, queueDepth, strategy);
+        private boolean triggerRejected = false;
+
+        TestRouter(final int queueDepth) {
+            super(queueDepth);
         }
 
-        protected ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
-                final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
-            return DOMNotificationPublishService.REJECTED;
+        @Override
+        ListenableFuture<? extends Object> publish(final DOMNotification notification,
+                final Collection<Reg> subscribers) {
+            if (triggerRejected) {
+                return DOMNotificationPublishService.REJECTED;
+            }
+
+            triggerRejected = true;
+            return super.publish(notification, subscribers);
         }
 
         @Override
-        public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
+        public ListenableFuture<? extends Object> putNotificationImpl(final DOMNotification notification)
                 throws InterruptedException {
             Thread.sleep(2000);
-            return super.putNotification(notification);
-        }
-
-        public static TestRouter create(int queueDepth) {
-            final ExecutorService executor = Executors.newCachedThreadPool();
-
-            return new TestRouter(executor, queueDepth, DEFAULT_STRATEGY);
+            return super.putNotificationImpl(notification);
         }
     }
 }