Move stream name allocation 10/108710/2
authorRobert Varga <robert.varga@pantheon.tech>
Sun, 29 Oct 2023 15:24:45 +0000 (16:24 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 30 Oct 2023 11:19:39 +0000 (12:19 +0100)
ListenersBroker should really be tracking available streams in a single
map. This patch prepares the scenery by shifting the logic which creates
stream names down to ListenersBroker and makes CreateStreamUtil pick it
up from the returned adapter.

JIRA: NETCONF-1099
Change-Id: I16b1704d3a8e98ec1a3da4e57f88e97fd07a2d6a
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractNotificationListenerTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/JsonNotificationListenerTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/XmlNotificationListenerTest.java

index b829710993370421a16d1d9743e0f6c1fa843ec4..d1ee8a2b83083fe6d50672d7b20d3a26b49d6a39 100644 (file)
@@ -7,11 +7,7 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
 
-import static java.util.Objects.requireNonNull;
-
 import com.google.common.collect.ImmutableSet;
-import java.util.Set;
-import java.util.stream.Collectors;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
@@ -21,9 +17,7 @@ import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
-import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
@@ -115,25 +109,16 @@ final class CreateStreamUtil {
     // FIXME: this really should be a normal RPC implementation
     static ContainerNode createDataChangeNotifiStream(final ListenersBroker listenersBroker, final ContainerNode input,
             final EffectiveModelContext refSchemaCtx) {
-        // parsing out of container with settings and path
-        final YangInstanceIdentifier path = preparePath(input);
-
-        // building of stream name
-        final StringBuilder streamNameBuilder = new StringBuilder(
-                prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx), input));
-        final NotificationOutputType outputType = prepareOutputType(input);
-        if (outputType.equals(NotificationOutputType.JSON)) {
-            streamNameBuilder.append('/').append(outputType.getName());
-        }
-        final String streamName = streamNameBuilder.toString();
-
-        // registration of the listener
-        listenersBroker.registerDataChangeListener(path, streamName, outputType);
+        final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
+        final var scopeName = extractStringLeaf(input, SCOPE_NODEID);
+        final var adapter = listenersBroker.registerDataChangeListener(refSchemaCtx,
+            datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName) : LogicalDatastoreType.CONFIGURATION,
+            preparePath(input), scopeName != null ? Scope.ofName(scopeName) : Scope.BASE, prepareOutputType(input));
 
         // building of output
         return Builders.containerBuilder()
             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
-            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, streamName))
+            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
             .build();
     }
 
@@ -146,40 +131,20 @@ final class CreateStreamUtil {
             .sorted()
             .collect(ImmutableSet.toImmutableSet());
 
-        final var streamNameBuilder = new StringBuilder(RestconfStreamsConstants.NOTIFICATION_STREAM).append('/');
-        var haveFirst = false;
         for (var qname : qnames) {
-            final var module = refSchemaCtx.findModuleStatement(qname.getModule())
-                .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
-                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
-            final var stmt = module.findSchemaTreeNode(qname)
-                .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
-                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
-            if (!(stmt instanceof NotificationEffectiveStatement)) {
-                throw new RestconfDocumentedException(qname + " refers to a non-notification",
+            if (refSchemaCtx.findNotification(qname).isEmpty()) {
+                throw new RestconfDocumentedException(qname + " refers to an unknown notification",
                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
             }
-
-            if (haveFirst) {
-                streamNameBuilder.append(',');
-            } else {
-                haveFirst = true;
-            }
-            streamNameBuilder.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
-        }
-        final var outputType = prepareOutputType(input);
-        if (outputType.equals(NotificationOutputType.JSON)) {
-            streamNameBuilder.append('/').append(outputType.getName());
         }
 
-        final var streamName = streamNameBuilder.toString();
-
         // registration of the listener
-        listenersBroker.registerNotificationListener(qnames, streamName, outputType);
+        final var adapter = listenersBroker.registerNotificationListener(refSchemaCtx, qnames,
+            prepareOutputType(input));
 
         return Builders.containerBuilder()
             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
-            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, streamName))
+            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
             .build();
     }
 
@@ -198,8 +163,7 @@ final class CreateStreamUtil {
             final ListenersBroker listenersBroker) {
         // parsing out of container with settings and path
         // FIXME: ugly cast
-        final YangInstanceIdentifier path =
-            (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
+        final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
                 .map(DataContainerChild::body)
                 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
                     ErrorTag.DATA_MISSING));
@@ -222,28 +186,28 @@ final class CreateStreamUtil {
             .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
 
-        final EffectiveModelContext mountModelContext = mountPoint.getService(DOMSchemaService.class)
+        final var mountModelContext = mountPoint.getService(DOMSchemaService.class)
             .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
             .getGlobalContext();
-        final Set<Absolute> notificationPaths = mountModelContext.getModuleStatements().values().stream()
+        final var notificationPaths = mountModelContext.getModuleStatements().values().stream()
             .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
             .map(notification -> Absolute.of(notification.argument()))
-            .collect(Collectors.toUnmodifiableSet());
+            .collect(ImmutableSet.toImmutableSet());
         if (notificationPaths.isEmpty()) {
             throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
                 ErrorTag.OPERATION_FAILED);
         }
 
-        final DeviceNotificationListenerAdaptor notificationListenerAdapter = listenersBroker
-            .registerDeviceNotificationListener(deviceName, prepareOutputType(input), mountModelContext,
-                mountPointService, mountPoint.getIdentifier());
+        final var notificationListenerAdapter = listenersBroker.registerDeviceNotificationListener(deviceName,
+            prepareOutputType(input), mountModelContext, mountPointService, mountPoint.getIdentifier());
         notificationListenerAdapter.listen(mountNotifService, notificationPaths);
 
         return Builders.containerBuilder()
             .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
-            .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName + "?"
-                + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
+            .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
+                baseUrl + notificationListenerAdapter.getStreamName() + "?"
+                    + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
             .build();
     }
 
@@ -258,30 +222,6 @@ final class CreateStreamUtil {
         return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
     }
 
-    /**
-     * Prepare stream name.
-     *
-     * @param path          Path of element from which data-change-event notifications are going to be generated.
-     * @param schemaContext Schema context.
-     * @param data          Container with stream settings (RPC create-stream).
-     * @return Parsed stream name.
-     */
-    private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
-            final EffectiveModelContext schemaContext, final ContainerNode data) {
-        final String datastoreName = extractStringLeaf(data, DATASTORE_NODEID);
-        final LogicalDatastoreType datastoreType = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
-            : LogicalDatastoreType.CONFIGURATION;
-
-        final String scopeName = extractStringLeaf(data, SCOPE_NODEID);
-        // FIXME: this is not really used
-        final Scope scope = scopeName != null ? Scope.ofName(scopeName) : Scope.BASE;
-
-        return RestconfStreamsConstants.DATA_SUBSCRIPTION
-            + "/" + ListenersBroker.createStreamNameFromUri(IdentifierCodec.serialize(path, schemaContext)
-                + "/" + RestconfStreamsConstants.DATASTORE_PARAM_NAME + "=" + datastoreType
-                + "/" + RestconfStreamsConstants.SCOPE_PARAM_NAME + "=" + scope);
-    }
-
     /**
      * Prepare {@link YangInstanceIdentifier} of stream source.
      *
index 1a5d502de86487b3c882ae8e7f3547ee198df20f..86ab5983e48e8bb0cd843ce3f472c0dcdec5b72e 100644 (file)
@@ -18,14 +18,21 @@ import java.util.concurrent.locks.StampedLock;
 import javax.inject.Inject;
 import javax.inject.Singleton;
 import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.common.ErrorTag;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.slf4j.Logger;
@@ -135,20 +142,24 @@ public final class ListenersBroker {
      * hasn't been created yet.
      *
      * @param path       Path to data in data repository.
-     * @param streamName Stream name.
      * @param outputType Specific type of output for notifications - XML or JSON.
      * @return Created or existing data-change listener adapter.
      */
-    public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
+    public ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
+            final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final Scope scope,
             final NotificationOutputType outputType) {
-        requireNonNull(path);
-        requireNonNull(streamName);
-        requireNonNull(outputType);
+        final var sb = new StringBuilder(RestconfStreamsConstants.DATA_SUBSCRIPTION)
+            .append('/').append(createStreamNameFromUri(IdentifierCodec.serialize(path, modelContext)))
+            .append('/').append(RestconfStreamsConstants.DATASTORE_PARAM_NAME).append('=').append(datastore)
+            .append('/').append(RestconfStreamsConstants.SCOPE_PARAM_NAME).append('=').append(scope);
+        if (outputType != NotificationOutputType.XML) {
+            sb.append('/').append(outputType.getName());
+        }
 
         final long stamp = dataChangeListenersLock.writeLock();
         try {
-            return dataChangeListeners.computeIfAbsent(streamName,
-                stream -> new ListenerAdapter(path, stream, outputType, this));
+            return dataChangeListeners.computeIfAbsent(sb.toString(),
+                streamName -> new ListenerAdapter(path, streamName, outputType, this));
         } finally {
             dataChangeListenersLock.unlockWrite(stamp);
         }
@@ -158,21 +169,42 @@ public final class ListenersBroker {
      * Creates new {@link NotificationDefinition} listener using input stream name and schema path
      * if such listener haven't been created yet.
      *
+     * @param refSchemaCtx reference {@link EffectiveModelContext}
      * @param notifications {@link QName}s of accepted YANG notifications
-     * @param streamName Stream name.
      * @param outputType Specific type of output for notifications - XML or JSON.
      * @return Created or existing notification listener adapter.
      */
-    public NotificationListenerAdapter registerNotificationListener(final ImmutableSet<QName> notifications,
-            final String streamName, final NotificationOutputType outputType) {
-        requireNonNull(notifications);
-        requireNonNull(streamName);
-        requireNonNull(outputType);
+    public NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
+            final ImmutableSet<QName> notifications, final NotificationOutputType outputType) {
+        final var sb = new StringBuilder(RestconfStreamsConstants.NOTIFICATION_STREAM).append('/');
+        var haveFirst = false;
+        for (var qname : notifications) {
+            final var module = refSchemaCtx.findModuleStatement(qname.getModule())
+                .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
+                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+            final var stmt = module.findSchemaTreeNode(qname)
+                .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
+                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+            if (!(stmt instanceof NotificationEffectiveStatement)) {
+                throw new RestconfDocumentedException(qname + " refers to a non-notification",
+                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
+            }
+
+            if (haveFirst) {
+                sb.append(',');
+            } else {
+                haveFirst = true;
+            }
+            sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
+        }
+        if (outputType != NotificationOutputType.XML) {
+            sb.append('/').append(outputType.getName());
+        }
 
         final long stamp = notificationListenersLock.writeLock();
         try {
-            return notificationListeners.computeIfAbsent(streamName,
-                stream -> new NotificationListenerAdapter(notifications, stream, outputType, this));
+            return notificationListeners.computeIfAbsent(sb.toString(),
+                streamName -> new NotificationListenerAdapter(notifications, streamName, outputType, this));
         } finally {
             notificationListenersLock.unlockWrite(stamp);
         }
@@ -182,20 +214,19 @@ public final class ListenersBroker {
      * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
      * if such listener haven't been created yet.
      *
-     * @param streamName Stream name.
+     * @param deviceName Device name.
      * @param outputType Specific type of output for notifications - XML or JSON.
      * @param refSchemaCtx Schema context of node
      * @param mountPointService Mount point service
      * @return Created or existing device notification listener adapter.
      */
-    public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String streamName,
-        final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
-        final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
-
+    public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
+            final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
+            final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
         final long stamp = deviceNotificationListenersLock.writeLock();
         try {
-            return deviceNotificationListeners.computeIfAbsent(streamName,
-                stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
+            return deviceNotificationListeners.computeIfAbsent(deviceName,
+                streamName -> new DeviceNotificationListenerAdaptor(deviceName, outputType, refSchemaCtx,
                     mountPointService, path, this));
         } finally {
             deviceNotificationListenersLock.unlockWrite(stamp);
@@ -396,4 +427,4 @@ public final class ListenersBroker {
         }
         return result;
     }
-}
\ No newline at end of file
+}
index c9fb933c8697fec38a61ffd3d0d9079ec9a548fe..c48ca9ba763f99549c97c5c2ec8d791fc88ad39f 100644 (file)
@@ -12,6 +12,7 @@ import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.opendaylight.restconf.nb.rfc8040.streams.listeners.AbstractNotificationListenerTest.MODEL_CONTEXT;
 
 import com.google.common.collect.ImmutableClassToInstanceMap;
 import java.net.URI;
@@ -20,12 +21,12 @@ import javax.ws.rs.core.MultivaluedHashMap;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
@@ -35,24 +36,22 @@ import org.opendaylight.restconf.nb.rfc8040.URLConstants;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.AbstractNotificationListenerTest;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class RestconfStreamsSubscriptionServiceImplTest {
+public class RestconfStreamsSubscriptionServiceImplTest extends AbstractNotificationListenerTest {
     private static final String URI = "/rests/data/ietf-restconf-monitoring:restconf-state/streams/stream/"
             + "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
 
-    private static EffectiveModelContext MODEL_CONTEXT;
-
     @Mock
     private DOMDataBroker dataBroker;
     @Mock
@@ -65,17 +64,12 @@ public class RestconfStreamsSubscriptionServiceImplTest {
     private StreamsConfiguration configurationSse;
     private DatabindProvider databindProvider;
 
-    @BeforeClass
-    public static void beforeClass() {
-        MODEL_CONTEXT = YangParserTestUtils.parseYangResourceDirectory("/notifications");
-    }
 
     @Before
     public void setUp() throws URISyntaxException {
-        final var name = "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
-        listenersBroker.registerDataChangeListener(
+        listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
             YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
-            name, NotificationOutputType.JSON);
+            Scope.ONE, NotificationOutputType.JSON);
         final DOMDataTreeWriteTransaction wTx = mock(DOMDataTreeWriteTransaction.class);
         doReturn(wTx).when(dataBroker).newWriteOnlyTransaction();
         doReturn(CommitInfo.emptyFluentFuture()).when(wTx).commit();
@@ -98,10 +92,9 @@ public class RestconfStreamsSubscriptionServiceImplTest {
 
     @Test
     public void testSubscribeToStreamSSE() {
-        listenersBroker.registerDataChangeListener(
-                IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
-                "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
-                NotificationOutputType.XML);
+        listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
+            IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
+            NotificationOutputType.XML);
         final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
             notificationService, databindProvider, listenersBroker, configurationSse);
         final var response = streamsSubscriptionService.subscribeToStream(
@@ -113,10 +106,9 @@ public class RestconfStreamsSubscriptionServiceImplTest {
 
     @Test
     public void testSubscribeToStreamWS() {
-        listenersBroker.registerDataChangeListener(
-                IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
-                "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
-                NotificationOutputType.XML);
+        listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
+            IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
+            NotificationOutputType.XML);
         final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
             notificationService, databindProvider, listenersBroker, configurationWs);
         final var response = streamsSubscriptionService.subscribeToStream(
@@ -128,9 +120,8 @@ public class RestconfStreamsSubscriptionServiceImplTest {
 
     @Test
     public void testSubscribeToStreamMissingDatastoreInPath() {
-        final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
-                new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
-                    listenersBroker, configurationWs);
+        final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
+            notificationService, databindProvider, listenersBroker, configurationWs);
         final var errors = assertThrows(RestconfDocumentedException.class,
             () -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/scope=ONE", uriInfo))
             .getErrors();
@@ -143,9 +134,8 @@ public class RestconfStreamsSubscriptionServiceImplTest {
 
     @Test
     public void testSubscribeToStreamMissingScopeInPath() {
-        final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
-                new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
-                    listenersBroker, configurationWs);
+        final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
+            notificationService, databindProvider, listenersBroker, configurationWs);
         final var errors = assertThrows(RestconfDocumentedException.class,
             () -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL",
                 uriInfo)).getErrors();
index 0dc747beb18ecc8a0ef199cc9841a8c6ef830ec0..368f01c69695063f9493b190028cdfd9e06541ea 100644 (file)
@@ -7,64 +7,70 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
 import java.net.URI;
 import java.util.concurrent.ScheduledExecutorService;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer.WebSocketFactory;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.AbstractNotificationListenerTest;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class WebSocketFactoryTest {
+@ExtendWith(MockitoExtension.class)
+class WebSocketFactoryTest extends AbstractNotificationListenerTest {
     private static final String REGISTERED_STREAM_NAME = "data-change-event-subscription/"
-            + "toaster:toaster/datastore=CONFIGURATION/scope=SUBTREE";
-    private static final YangInstanceIdentifier TOASTER_YIID = YangInstanceIdentifier.builder()
-            .node(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))
-            .build();
+            + "toaster:toaster/datastore=CONFIGURATION/scope=SUBTREE/JSON";
 
     private final ListenersBroker listenersBroker = new ListenersBroker();
 
-    private final WebSocketFactory webSocketFactory = new WebSocketFactory(mock(ScheduledExecutorService.class),
-        listenersBroker, 5000, 2000);
+    @Mock
+    private ScheduledExecutorService execService;
+    @Mock
+    private ServletUpgradeRequest upgradeRequest;
+    @Mock
+    private ServletUpgradeResponse upgradeResponse;
 
-    @Before
-    public void prepareListenersBroker() {
-        listenersBroker.registerDataChangeListener(TOASTER_YIID, REGISTERED_STREAM_NAME,
-                NotificationOutputTypeGrouping.NotificationOutputType.JSON);
+    private WebSocketFactory webSocketFactory;
+
+    @BeforeEach
+    void prepareListenersBroker() {
+        webSocketFactory = new WebSocketFactory(execService, listenersBroker, 5000, 2000);
+
+        listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.CONFIGURATION,
+            YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
+            Scope.SUBTREE, NotificationOutputTypeGrouping.NotificationOutputType.JSON);
     }
 
     @Test
-    public void createWebSocketSuccessfully() {
-        final ServletUpgradeRequest upgradeRequest = mock(ServletUpgradeRequest.class);
-        final ServletUpgradeResponse upgradeResponse = mock(ServletUpgradeResponse.class);
+    void createWebSocketSuccessfully() {
         doReturn(URI.create('/' + REGISTERED_STREAM_NAME + '/')).when(upgradeRequest).getRequestURI();
 
-        final Object webSocket = webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse);
-        assertThat(webSocket, instanceOf(WebSocketSessionHandler.class));
+        assertInstanceOf(WebSocketSessionHandler.class,
+            webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse));
         verify(upgradeResponse).setSuccess(true);
         verify(upgradeResponse).setStatusCode(101);
     }
 
     @Test
-    public void createWebSocketUnsuccessfully() {
-        final ServletUpgradeRequest upgradeRequest = mock(ServletUpgradeRequest.class);
-        final ServletUpgradeResponse upgradeResponse = mock(ServletUpgradeResponse.class);
+    void createWebSocketUnsuccessfully() {
         doReturn(URI.create('/' + REGISTERED_STREAM_NAME + '/' + "toasterStatus"))
             .when(upgradeRequest).getRequestURI();
 
-        final Object webSocket = webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse);
-        assertNull(webSocket);
+        assertNull(webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse));
         verify(upgradeResponse).setSuccess(false);
         verify(upgradeResponse).setStatusCode(404);
     }
index a1b62e289439252ced1f9fb97db6fb97b170f9d5..19273353a90c104eb8d36b62345f188004165d74 100644 (file)
@@ -7,26 +7,15 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.opendaylight.yangtools.yang.common.QNameModule;
 import org.opendaylight.yangtools.yang.common.Revision;
 import org.opendaylight.yangtools.yang.common.XMLNamespace;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
 
-abstract class AbstractNotificationListenerTest {
+public abstract class AbstractNotificationListenerTest {
     static final QNameModule MODULE = QNameModule.create(XMLNamespace.of("notifi:mod"), Revision.of("2016-11-23"));
 
-    static EffectiveModelContext SCHEMA_CONTEXT;
-
-    @BeforeClass
-    public static final void beforeClass() {
-        SCHEMA_CONTEXT = YangParserTestUtils.parseYangResourceDirectory("/notifications");
-    }
-
-    @AfterClass
-    public static final void afterClass() {
-        SCHEMA_CONTEXT = null;
-    }
+    public static final EffectiveModelContext MODEL_CONTEXT =
+        YangParserTestUtils.parseYangResourceDirectory("/notifications");
 }
index d452733d3bf37d71b6316b514f8bca3534bda767..3909a627377e9898f5bc0dcc7bd771283d1e5def 100644 (file)
@@ -164,8 +164,8 @@ public class JsonNotificationListenerTest extends AbstractNotificationListenerTe
 
     private String prepareJson(final DOMNotification notificationData, final QName schemaPathNotifi)
             throws Exception {
-        final var notifiAdapter = listenersBroker.registerNotificationListener(ImmutableSet.of(schemaPathNotifi),
-            "json-stream", NotificationOutputType.JSON);
-        return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now()).orElseThrow();
+        final var notifiAdapter = listenersBroker.registerNotificationListener(MODEL_CONTEXT,
+            ImmutableSet.of(schemaPathNotifi), NotificationOutputType.JSON);
+        return notifiAdapter.formatter().eventData(MODEL_CONTEXT, notificationData, Instant.now()).orElseThrow();
     }
 }
index b8907bd254ac425c533ee78c505791229ac382d9..632171c6857ab69df0356b0ea706aef019c05892 100644 (file)
@@ -156,8 +156,8 @@ public class XmlNotificationListenerTest extends AbstractNotificationListenerTes
 
     private String prepareXmlResult(final DOMNotification notificationData, final QName schemaPathNotifi)
             throws Exception {
-        final var notifiAdapter = listenersBroker.registerNotificationListener(ImmutableSet.of(schemaPathNotifi),
-            "xml-stream", NotificationOutputType.XML);
-        return notifiAdapter.formatter().eventData(SCHEMA_CONTEXT, notificationData, Instant.now()).orElseThrow();
+        final var notifiAdapter = listenersBroker.registerNotificationListener(MODEL_CONTEXT,
+            ImmutableSet.of(schemaPathNotifi), NotificationOutputType.XML);
+        return notifiAdapter.formatter().eventData(MODEL_CONTEXT, notificationData, Instant.now()).orElseThrow();
     }
 }