import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.opendaylight.mdsal.dom.broker.TestUtils.TEST_CHILD;
-import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
-import com.lmax.disruptor.PhasedBackoffWaitStrategy;
-import com.lmax.disruptor.WaitStrategy;
-
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
-
import org.junit.Test;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension;
+import org.opendaylight.mdsal.dom.api.DOMNotificationPublishDemandExtension.DemandListener;
import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
-import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.util.ListenerRegistry;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-
-public class DOMNotificationRouterTest extends TestUtils {
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
- private static final WaitStrategy DEFAULT_STRATEGY = PhasedBackoffWaitStrategy.withLock(
- 1L, 30L, TimeUnit.MILLISECONDS);
+public class DOMNotificationRouterTest {
+ @Test
+ public void registerNotificationListener() {
+ try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
+ final var domNotificationListener = mock(DOMNotificationListener.class);
+
+ domNotificationRouter.notificationService().registerNotificationListener(domNotificationListener,
+ List.of(Absolute.of(QName.create("urn:opendaylight:test-listener", "notif1"))));
+ assertEquals(1, domNotificationRouter.listeners().size());
+
+ domNotificationRouter.notificationService().registerNotificationListener(domNotificationListener,
+ List.of(Absolute.of(QName.create("urn:opendaylight:test-listener", "notif2")),
+ Absolute.of(QName.create("urn:opendaylight:test-listener", "notif3"))));
+ assertEquals(3, domNotificationRouter.listeners().size());
+ }
+ }
@Test
- public void create() throws Exception {
- assertNotNull(DOMNotificationRouter.create(1,1,1,TimeUnit.SECONDS));
- assertNotNull(DOMNotificationRouter.create(1));
+ public void registerNotificationListeners() {
+ try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
+ final var domNotificationListener1 = mock(DOMNotificationListener.class);
+ final var domNotificationListener2 = mock(DOMNotificationListener.class);
+
+ domNotificationRouter.notificationService().registerNotificationListeners(
+ Map.of(Absolute.of(QName.create("urn:opendaylight:test-listener", "notif1")), domNotificationListener1,
+ Absolute.of(QName.create("urn:opendaylight:test-listener", "notif2")), domNotificationListener2));
+ assertEquals(2, domNotificationRouter.listeners().size());
+ }
}
@SuppressWarnings("checkstyle:IllegalCatch")
@Test
public void complexTest() throws Exception {
- final DOMNotificationSubscriptionListener domNotificationSubscriptionListener =
- mock(DOMNotificationSubscriptionListener.class);
- final CountDownLatch latch = new CountDownLatch(1);
- final DOMNotificationListener domNotificationListener = new TestListener(latch);
- final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
+ final var demandListener = mock(DemandListener.class);
+ doNothing().when(demandListener).onDemandUpdated(any());
+
+ final var latch = new CountDownLatch(1);
+ final var domNotificationListener = new TestListener(latch);
+ final var domNotificationRouter = new DOMNotificationRouter(1024);
+ final var notifService = domNotificationRouter.notificationService();
+ final var notifPubService = domNotificationRouter.notificationPublishService();
+ final var demandExt = notifPubService.extension(DOMNotificationPublishDemandExtension.class);
+ assertNotNull(demandExt);
- Multimap<SchemaPath, ?> listeners = domNotificationRouter.listeners();
+ var listeners = domNotificationRouter.listeners();
assertTrue(listeners.isEmpty());
- assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.ROOT));
- assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.SAME));
+ assertNotNull(notifService.registerNotificationListener(domNotificationListener,
+ List.of(Absolute.of(TestModel.TEST_QNAME))));
+ assertNotNull(notifService.registerNotificationListener(domNotificationListener,
+ List.of(Absolute.of(TestModel.TEST2_QNAME))));
listeners = domNotificationRouter.listeners();
assertFalse(listeners.isEmpty());
- ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
- domNotificationRouter.subscriptionListeners();
+ assertEquals(0, domNotificationRouter.demandListeners().streamObjects().count());
- assertFalse(subscriptionListeners.iterator().hasNext());
- assertNotNull(domNotificationRouter.registerSubscriptionListener(domNotificationSubscriptionListener));
+ assertNotNull(demandExt.registerDemandListener(demandListener));
- subscriptionListeners = domNotificationRouter.subscriptionListeners();
- assertTrue(subscriptionListeners.iterator().hasNext());
+ assertSame(demandListener, domNotificationRouter.demandListeners().streamObjects().findAny().orElseThrow());
- final DOMNotification domNotification = mock(DOMNotification.class);
+ final var domNotification = mock(DOMNotification.class);
doReturn("test").when(domNotification).toString();
- doReturn(SchemaPath.ROOT).when(domNotification).getType();
+ doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
doReturn(TEST_CHILD).when(domNotification).getBody();
- assertNotNull(domNotificationRouter.offerNotification(domNotification));
-
- try {
- assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
- assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
- } catch (Exception e) {
- assertTrue(e instanceof UnsupportedOperationException);
- }
+ assertNotNull(notifPubService.offerNotification(domNotification));
- assertNotNull(domNotificationRouter.putNotification(domNotification));
+ assertNotNull(notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+ assertNotNull(notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+ assertNotNull(notifPubService.putNotification(domNotification));
}
@Test
public void offerNotification() throws Exception {
- final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
- final DOMNotification domNotification = mock(DOMNotification.class);
- doReturn(SchemaPath.ROOT).when(domNotification).getType();
- doReturn(TEST_CHILD).when(domNotification).getBody();
- assertNotNull(domNotificationRouter.putNotification(domNotification));
- assertNotNull(domNotificationRouter.offerNotification(domNotification));
- assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+ try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
+ final var domNotification = mock(DOMNotification.class);
+ doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
+ doReturn(TEST_CHILD).when(domNotification).getBody();
+
+ final var notifPubService = domNotificationRouter.notificationPublishService();
+ assertNotNull(notifPubService.putNotification(domNotification));
+ assertNotNull(notifPubService.offerNotification(domNotification));
+ assertNotNull(notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+ }
}
@Test
public void testOfferNotificationWithBlocking() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
- final TestListener testListener = new TestListener(latch);
- final DOMNotification domNotification = mock(DOMNotification.class);
+ final var latch = new CountDownLatch(1);
+ final var testListener = new TestListener(latch);
+ final var domNotification = mock(DOMNotification.class);
doReturn("test").when(domNotification).toString();
- doReturn(SchemaPath.ROOT).when(domNotification).getType();
+ doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
doReturn(TEST_CHILD).when(domNotification).getBody();
- final TestRouter testRouter = TestRouter.create(1);
- assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.ROOT));
- assertNotNull(testRouter.registerNotificationListener(testListener, SchemaPath.SAME));
- assertNotEquals(DOMNotificationPublishService.REJECTED,
- testRouter.offerNotification(domNotification, 3, TimeUnit.SECONDS));
- assertTrue("Listener was not notified", latch.await(5, TimeUnit.SECONDS));
- assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+ try (var testRouter = new TestRouter(1)) {
+ final var notifService = testRouter.notificationService();
+ final var notifPubService = testRouter.notificationPublishService();
- assertEquals(DOMNotificationPublishService.REJECTED,
- testRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
- assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+ assertNotNull(notifService.registerNotificationListener(testListener,
+ List.of(Absolute.of(TestModel.TEST_QNAME))));
+ assertNotNull(notifService.registerNotificationListener(testListener,
+ List.of(Absolute.of(TestModel.TEST2_QNAME))));
+ assertNotEquals(DOMNotificationPublishService.REJECTED,
+ notifPubService.offerNotification(domNotification, 3, TimeUnit.SECONDS));
+ assertTrue("Listener was not notified", latch.await(5, TimeUnit.SECONDS));
+ assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+
+ assertEquals(DOMNotificationPublishService.REJECTED,
+ notifPubService.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+ assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+ }
}
@Test
public void close() throws Exception {
- final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
-
- final ExecutorService executor = domNotificationRouter.executor();
-
- assertFalse(executor.isShutdown());
- domNotificationRouter.close();
+ final ExecutorService executor;
+ final ExecutorService observer;
+
+ try (var domNotificationRouter = new DOMNotificationRouter(1024)) {
+ executor = domNotificationRouter.executor();
+ observer = domNotificationRouter.observer();
+ assertFalse(executor.isShutdown());
+ assertFalse(observer.isShutdown());
+ }
assertTrue(executor.isShutdown());
+ assertTrue(observer.isShutdown());
}
private static class TestListener implements DOMNotificationListener {
- private final CountDownLatch latch;
private final List<DOMNotification> receivedNotifications = new ArrayList<>();
+ private final CountDownLatch latch;
TestListener(final CountDownLatch latch) {
this.latch = latch;
}
@Override
- public void onNotification(@Nonnull final DOMNotification notification) {
+ public void onNotification(final DOMNotification notification) {
receivedNotifications.add(notification);
latch.countDown();
}
- public List<DOMNotification> getReceivedNotifications() {
+ List<DOMNotification> getReceivedNotifications() {
return receivedNotifications;
}
}
private static class TestRouter extends DOMNotificationRouter {
- TestRouter(ExecutorService executor, int queueDepth, WaitStrategy strategy) {
- super(executor, queueDepth, strategy);
+ private boolean triggerRejected = false;
+
+ TestRouter(final int queueDepth) {
+ super(queueDepth);
}
- protected ListenableFuture<? extends Object> tryPublish(final DOMNotification notification,
- final Collection<ListenerRegistration<? extends DOMNotificationListener>> subscribers) {
- return DOMNotificationPublishService.REJECTED;
+ @Override
+ ListenableFuture<? extends Object> publish(final DOMNotification notification,
+ final Collection<Reg> subscribers) {
+ if (triggerRejected) {
+ return DOMNotificationPublishService.REJECTED;
+ }
+
+ triggerRejected = true;
+ return super.publish(notification, subscribers);
}
@Override
- public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
+ public ListenableFuture<? extends Object> putNotificationImpl(final DOMNotification notification)
throws InterruptedException {
Thread.sleep(2000);
- return super.putNotification(notification);
- }
-
- public static TestRouter create(int queueDepth) {
- final ExecutorService executor = Executors.newCachedThreadPool();
-
- return new TestRouter(executor, queueDepth, DEFAULT_STRATEGY);
+ return super.putNotificationImpl(notification);
}
}
}