*/
package org.opendaylight.mdsal.dom.broker;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.opendaylight.mdsal.dom.broker.TestUtils.TEST_CHILD;
import com.google.common.collect.Multimap;
-import java.lang.reflect.Field;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
import org.junit.Test;
import org.opendaylight.mdsal.dom.api.DOMNotification;
import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.mdsal.dom.api.DOMNotificationPublishService;
import org.opendaylight.mdsal.dom.spi.DOMNotificationSubscriptionListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.util.ListenerRegistry;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
-public class DOMNotificationRouterTest extends TestUtils {
+public class DOMNotificationRouterTest {
@Test
public void create() throws Exception {
- assertNotNull(DOMNotificationRouter.create(1,1,1,TimeUnit.SECONDS));
- assertNotNull(DOMNotificationRouter.create(1));
+ assertNotNull(DOMNotificationRouter.create(1024));
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
@Test
public void complexTest() throws Exception {
final DOMNotificationSubscriptionListener domNotificationSubscriptionListener =
mock(DOMNotificationSubscriptionListener.class);
- final DOMNotificationListener domNotificationListener = new TestListener();
- final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
+ doNothing().when(domNotificationSubscriptionListener).onSubscriptionChanged(any());
- final Field listenersField = DOMNotificationRouter.class.getDeclaredField("listeners");
- listenersField.setAccessible(true);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final DOMNotificationListener domNotificationListener = new TestListener(latch);
+ final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1024);
- final Field subscriptionListenersField = DOMNotificationRouter.class.getDeclaredField("subscriptionListeners");
- subscriptionListenersField.setAccessible(true);
-
- Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>> listeners =
- (Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>>)
- listenersField.get(domNotificationRouter);
+ Multimap<Absolute, ?> listeners = domNotificationRouter.listeners();
assertTrue(listeners.isEmpty());
- assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.ROOT));
- assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener, SchemaPath.SAME));
+ assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener,
+ Absolute.of(TestModel.TEST_QNAME)));
+ assertNotNull(domNotificationRouter.registerNotificationListener(domNotificationListener,
+ Absolute.of(TestModel.TEST2_QNAME)));
- listeners = (Multimap<SchemaPath, ListenerRegistration<? extends DOMNotificationListener>>)
- listenersField.get(domNotificationRouter);
+ listeners = domNotificationRouter.listeners();
assertFalse(listeners.isEmpty());
ListenerRegistry<DOMNotificationSubscriptionListener> subscriptionListeners =
- (ListenerRegistry<DOMNotificationSubscriptionListener>)
- subscriptionListenersField.get(domNotificationRouter);
+ domNotificationRouter.subscriptionListeners();
- assertFalse(subscriptionListeners.iterator().hasNext());
+ assertEquals(0, subscriptionListeners.streamListeners().count());
assertNotNull(domNotificationRouter.registerSubscriptionListener(domNotificationSubscriptionListener));
- subscriptionListeners = (ListenerRegistry<DOMNotificationSubscriptionListener>)
- subscriptionListenersField.get(domNotificationRouter);
- assertTrue(subscriptionListeners.iterator().hasNext());
+ subscriptionListeners = domNotificationRouter.subscriptionListeners();
+ assertSame(domNotificationSubscriptionListener,
+ subscriptionListeners.streamListeners().findAny().orElseThrow());
final DOMNotification domNotification = mock(DOMNotification.class);
doReturn("test").when(domNotification).toString();
- doReturn(SchemaPath.ROOT).when(domNotification).getType();
+ doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
doReturn(TEST_CHILD).when(domNotification).getBody();
assertNotNull(domNotificationRouter.offerNotification(domNotification));
- Exception exception = null;
try {
assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
assertNotNull(domNotificationRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
- } catch (UnsupportedOperationException e) {
- exception = e;
+ } catch (Exception e) {
+ // FIXME: what is the point here?!
+ assertTrue(e instanceof UnsupportedOperationException);
}
- assertNotNull(exception);
+
assertNotNull(domNotificationRouter.putNotification(domNotification));
}
@Test
public void offerNotification() throws Exception {
- final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
+ final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1024);
final DOMNotification domNotification = mock(DOMNotification.class);
- doReturn(SchemaPath.ROOT).when(domNotification).getType();
+ doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
doReturn(TEST_CHILD).when(domNotification).getBody();
assertNotNull(domNotificationRouter.putNotification(domNotification));
assertNotNull(domNotificationRouter.offerNotification(domNotification));
}
@Test
- public void close() throws Exception {
- final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1);
+ public void testOfferNotificationWithBlocking() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final TestListener testListener = new TestListener(latch);
+ final DOMNotification domNotification = mock(DOMNotification.class);
+ doReturn("test").when(domNotification).toString();
+ doReturn(Absolute.of(TestModel.TEST_QNAME)).when(domNotification).getType();
+ doReturn(TEST_CHILD).when(domNotification).getBody();
- final Field executorField = DOMNotificationRouter.class.getDeclaredField("executor");
- executorField.setAccessible(true);
+ try (TestRouter testRouter = new TestRouter(1)) {
+ assertNotNull(testRouter.registerNotificationListener(testListener, Absolute.of(TestModel.TEST_QNAME)));
+ assertNotNull(testRouter.registerNotificationListener(testListener, Absolute.of(TestModel.TEST2_QNAME)));
- final ExecutorService executor = (ExecutorService) executorField.get(domNotificationRouter);
+ assertNotEquals(DOMNotificationPublishService.REJECTED,
+ testRouter.offerNotification(domNotification, 3, TimeUnit.SECONDS));
+ assertTrue("Listener was not notified", latch.await(5, TimeUnit.SECONDS));
+ assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+
+ assertEquals(DOMNotificationPublishService.REJECTED,
+ testRouter.offerNotification(domNotification, 1, TimeUnit.SECONDS));
+ assertEquals("Received notifications", 1, testListener.getReceivedNotifications().size());
+ }
+ }
+
+ @Test
+ public void close() throws Exception {
+ final DOMNotificationRouter domNotificationRouter = DOMNotificationRouter.create(1024);
+ final ExecutorService executor = domNotificationRouter.executor();
+ final ExecutorService observer = domNotificationRouter.observer();
assertFalse(executor.isShutdown());
+ assertFalse(observer.isShutdown());
domNotificationRouter.close();
assertTrue(executor.isShutdown());
+ assertTrue(observer.isShutdown());
}
- private class TestListener implements DOMNotificationListener {
+ private static class TestListener implements DOMNotificationListener {
+ private final CountDownLatch latch;
+ private final List<DOMNotification> receivedNotifications = new ArrayList<>();
+
+ TestListener(final CountDownLatch latch) {
+ this.latch = latch;
+ }
+
@Override
- public void onNotification(@Nonnull DOMNotification notification) {}
+ public void onNotification(final DOMNotification notification) {
+ receivedNotifications.add(notification);
+ latch.countDown();
+ }
+
+ public List<DOMNotification> getReceivedNotifications() {
+ return receivedNotifications;
+ }
+ }
+
+ private static class TestRouter extends DOMNotificationRouter {
+
+ private boolean triggerRejected = false;
+
+ TestRouter(final int queueDepth) {
+ super(queueDepth);
+ }
+
+ @Override
+ ListenableFuture<? extends Object> publish(final DOMNotification notification,
+ final Collection<AbstractListenerRegistration<? extends DOMNotificationListener>> subscribers) {
+ if (triggerRejected) {
+ return REJECTED;
+ }
+
+ triggerRejected = true;
+ return super.publish(notification, subscribers);
+ }
+
+ @Override
+ public ListenableFuture<? extends Object> putNotification(final DOMNotification notification)
+ throws InterruptedException {
+ Thread.sleep(2000);
+ return super.putNotification(notification);
+ }
}
-}
\ No newline at end of file
+}