Use random UUIDs for stream names 45/108845/1
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 3 Nov 2023 17:47:56 +0000 (18:47 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 3 Nov 2023 20:29:26 +0000 (21:29 +0100)
Deriving stream names from their parameters is rather tacky, as it opens
up race conditions, lifecycle leaks and the lure to try to parse the
names out.

This patch refactors the logic to split stream name allocation, which is
now random, and creation of the actual stream via StreamFactory.

Since we now know names cannot conflict and that each AbstractStream has
its name embedded, we can do away with BiMaps and StampedLocks and
instead keep all streams in a single ConcurrentMap.

We end up removing a ton of duplicated code, allowing us to focus on
the lifecycle itself.

This in turn shows that we do not have an end-of-stream indicator for
subscribers to see -- so introduce it and pick up the resulting
DeviceNotificationListenerAdaptor clean ups.

This refactor also shows that {Json,Xml}NotificationListenerTest are
only testing NotificationFormatter, so we rename them and migrate to
JUnit5.

JIRA: NETCONF-1102
Change-Id: I043bebbd9240281cda9e29b41881de43f0a6b769
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
17 files changed:
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationListenerAdaptor.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractStream.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DeviceNotificationListenerAdaptor.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONNotificationFormatter.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBroker.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStreamsConstants.java [deleted file]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandler.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/StreamSessionHandler.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandler.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLNotificationFormatter.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONNotificationFormatterTest.java [moved from restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/JsonNotificationListenerTest.java with 59% similarity]
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBrokerTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLNotificationFormatterTest.java [moved from restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/XmlNotificationListenerTest.java with 63% similarity]

index 958f745127fe0fb8ad532f08aed3d4cbe360edf4..afdb0e31dbe45a099f0ebd5df88189e18360059f 100644 (file)
@@ -57,7 +57,7 @@ public final class RestconfDataStreamServiceImpl {
     @Produces(MediaType.SERVER_SENT_EVENTS)
     public void getSSE(@PathParam("streamName") final String streamName, @Context final SseEventSink sink,
             @Context final Sse sse) {
-        final var listener = listenersBroker.listenerFor(streamName);
+        final var listener = listenersBroker.getStream(streamName);
         if (listener == null) {
             LOG.debug("Listener for stream with name {} was not found.", streamName);
             throw new WebApplicationException("No such stream: " + streamName, Status.NOT_FOUND);
index 94c0898921816f630cab535caafe30b832cb11b1..b83f10f900b47734db3d050fbd7b1d5e8268f9c2 100644 (file)
@@ -164,7 +164,7 @@ public final class RestconfInvokeOperationsServiceImpl {
             } else if (CreateNotificationStream.QNAME.equals(type)) {
                 return listenersBroker.createNotificationStream(input, localDatabind.modelContext());
             } else if (SubscribeDeviceNotification.QNAME.equals(type)) {
-                return listenersBroker.createDeviceNotificationListener(input,
+                return listenersBroker.createDeviceNotificationStream(input,
                     listenersBroker.prepareUriByStreamName(uriInfo, "").toString(), mountPointService);
             }
         }
index 29d9305f1c2f9d44c0c45dd12d7b609d822fb8d6..34511f1c73331398b72acb6c1170958da92a023f 100644 (file)
@@ -13,7 +13,6 @@ import org.opendaylight.mdsal.dom.api.DOMEvent;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
 import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,8 +24,6 @@ import org.slf4j.LoggerFactory;
 abstract class AbstractNotificationListenerAdaptor extends AbstractStream<DOMNotification>
         implements DOMNotificationListener {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationListenerAdaptor.class);
-    private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY =
-        JSONNotificationFormatter.createFactory(JSONCodecFactorySupplier.RFC7951);
 
     AbstractNotificationListenerAdaptor(final String streamName, final NotificationOutputType outputType,
             final ListenersBroker listenersBroker) {
@@ -35,7 +32,7 @@ abstract class AbstractNotificationListenerAdaptor extends AbstractStream<DOMNot
 
     private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
         return switch (outputType) {
-            case JSON -> JSON_FORMATTER_FACTORY;
+            case JSON -> JSONNotificationFormatter.FACTORY;
             case XML -> XMLNotificationFormatter.FACTORY;
         };
     }
index 5f3ec6786c6d32836d37670dd6ef97726cebaf2f..afa79dcae105c0fb5e8f449c09d600d380715302 100644 (file)
@@ -13,16 +13,13 @@ import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import javax.xml.xpath.XPathExpressionException;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.checkerframework.checker.lock.qual.Holding;
 import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
@@ -37,7 +34,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Base superclass for all stream types.
  */
-abstract class AbstractStream<T> implements AutoCloseable {
+abstract class AbstractStream<T> {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractStream.class);
 
     private final EventFormatterFactory<T> formatterFactory;
@@ -95,25 +92,6 @@ abstract class AbstractStream<T> implements AutoCloseable {
         return !subscribers.isEmpty();
     }
 
-    /**
-     * Return all subscribers of listener.
-     *
-     * @return Set of all subscribers.
-     */
-    final synchronized Set<StreamSessionHandler> getSubscribers() {
-        return new HashSet<>(subscribers);
-    }
-
-    @Override
-    public final synchronized void close() throws InterruptedException, ExecutionException {
-        if (registration != null) {
-            registration.close();
-            registration = null;
-        }
-        deleteDataInDS().get();
-        subscribers.clear();
-    }
-
     /**
      * Registers {@link StreamSessionHandler} subscriber.
      *
@@ -127,15 +105,41 @@ abstract class AbstractStream<T> implements AutoCloseable {
     }
 
     /**
-     * Removes {@link StreamSessionHandler} subscriber.
+     * Removes {@link StreamSessionHandler} subscriber. If this was the last subscriber also shut down this stream and
+     * initiate its removal from global state.
      *
      * @param subscriber SSE or WS session handler.
      */
     synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
         subscribers.remove(subscriber);
         LOG.debug("Subscriber {} is removed", subscriber);
-        if (!hasSubscribers()) {
-            listenersBroker.removeAndCloseListener(this);
+        if (subscribers.isEmpty()) {
+            closeRegistration();
+            listenersBroker.removeStream(dataBroker, this);
+        }
+    }
+
+    /**
+     * Signal the end-of-stream condition to subscribers, shut down this stream and initiate its removal from global
+     * state.
+     */
+    final synchronized void endOfStream() {
+        closeRegistration();
+
+        final var it = subscribers.iterator();
+        while (it.hasNext()) {
+            it.next().endOfStream();
+            it.remove();
+        }
+
+        listenersBroker.removeStream(dataBroker, this);
+    }
+
+    @Holding("this")
+    private void closeRegistration() {
+        if (registration != null) {
+            registration.close();
+            registration = null;
         }
     }
 
@@ -234,22 +238,11 @@ abstract class AbstractStream<T> implements AutoCloseable {
      */
     @SuppressWarnings("checkstyle:hiddenField")
     // FIXME: this is pure lifecycle nightmare just because ...
-    public void setCloseVars(final DOMDataBroker dataBroker, final DatabindProvider databindProvider) {
+    public synchronized void setCloseVars(final DOMDataBroker dataBroker, final DatabindProvider databindProvider) {
         this.dataBroker = dataBroker;
         this.databindProvider = databindProvider;
     }
 
-    /**
-     * Delete data in DS.
-     */
-    // FIXME: here we touch datastore, which probably should be done by whoever instantiated us or created the resource,
-    //        or they should be giving us the transaction
-    private ListenableFuture<?> deleteDataInDS() {
-        final var wTx = dataBroker.newWriteOnlyTransaction();
-        wTx.delete(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(streamName));
-        return wTx.commit();
-    }
-
     @Override
     public final String toString() {
         return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
index 639997d404edc1c5fb5bed2e911bbff0b401c077..6d5ae374e2348244671a46b12488a49886356178 100644 (file)
@@ -19,16 +19,12 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * {@link DeviceNotificationListenerAdaptor} is responsible to track events on notifications.
  */
 public final class DeviceNotificationListenerAdaptor extends AbstractNotificationListenerAdaptor
         implements DOMMountPointListener {
-    private static final Logger LOG = LoggerFactory.getLogger(DeviceNotificationListenerAdaptor.class);
-
     private final @NonNull EffectiveModelContext effectiveModel;
     private final @NonNull DOMMountPointService mountPointService;
     private final @NonNull YangInstanceIdentifier instanceIdentifier;
@@ -71,20 +67,8 @@ public final class DeviceNotificationListenerAdaptor extends AbstractNotificatio
     @Override
     public void onMountPointRemoved(final YangInstanceIdentifier path) {
         if (instanceIdentifier.equals(path)) {
-            getSubscribers().forEach(subscriber -> {
-                if (subscriber.isConnected()) {
-                    subscriber.sendDataMessage("Device disconnected");
-                }
-                if (subscriber instanceof SSESessionHandler sseSessionHandler) {
-                    try {
-                        sseSessionHandler.close();
-                    } catch (IllegalStateException e) {
-                        LOG.warn("Ignoring exception while closing sse session");
-                    }
-                }
-            });
-            listenersBroker.removeAndCloseDeviceNotificationListener(this);
             resetListenerRegistration();
+            endOfStream();
         }
     }
 }
index 07b6a4f02f3979e667f7f0835043f65c8f656067..c61b12d8256abace217e6543e3c3841af838b229 100644 (file)
@@ -7,8 +7,7 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
-import static java.util.Objects.requireNonNull;
-
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.stream.JsonWriter;
 import java.io.IOException;
 import java.io.StringWriter;
@@ -22,40 +21,31 @@ import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamW
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 
 final class JSONNotificationFormatter extends NotificationFormatter {
-    private static final @NonNull String NOTIFICATION_NAME;
+    private static final @NonNull String NOTIFICATION_NAME =
+        $YangModuleInfoImpl.getInstance().getName().getLocalName() + ":notification";
+    @VisibleForTesting
+    static final JSONNotificationFormatter EMPTY = new JSONNotificationFormatter(TextParameters.EMPTY);
 
-    static {
-        final var ietfRestconfName = $YangModuleInfoImpl.getInstance().getName();
-        NOTIFICATION_NAME = ietfRestconfName.getLocalName() + ":notification";
-    }
+    static final NotificationFormatterFactory FACTORY = new NotificationFormatterFactory(EMPTY) {
+        @Override
+        JSONNotificationFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
+                throws XPathExpressionException {
+            return new JSONNotificationFormatter(textParams, xpathFilter);
+        }
 
-    private final JSONCodecFactorySupplier codecSupplier;
+        @Override
+        JSONNotificationFormatter newFormatter(final TextParameters textParams) {
+            return new JSONNotificationFormatter(textParams);
+        }
+    };
 
-    private JSONNotificationFormatter(final TextParameters textParams, final JSONCodecFactorySupplier codecSupplier) {
+    private JSONNotificationFormatter(final TextParameters textParams) {
         super(textParams);
-        this.codecSupplier = requireNonNull(codecSupplier);
     }
 
-    private JSONNotificationFormatter(final TextParameters textParams, final String xpathFilter,
-            final JSONCodecFactorySupplier codecSupplier) throws XPathExpressionException {
+    private JSONNotificationFormatter(final TextParameters textParams, final String xpathFilter)
+            throws XPathExpressionException {
         super(textParams, xpathFilter);
-        this.codecSupplier = requireNonNull(codecSupplier);
-    }
-
-    static NotificationFormatterFactory createFactory(final JSONCodecFactorySupplier codecSupplier) {
-        final var empty = new JSONNotificationFormatter(TextParameters.EMPTY, codecSupplier);
-        return new NotificationFormatterFactory(empty) {
-            @Override
-            JSONNotificationFormatter getFormatter(final TextParameters textParams, final String xpathFilter)
-                    throws XPathExpressionException {
-                return new JSONNotificationFormatter(textParams, xpathFilter, codecSupplier);
-            }
-
-            @Override
-            JSONNotificationFormatter newFormatter(final TextParameters textParams) {
-                return new JSONNotificationFormatter(textParams, codecSupplier);
-            }
-        };
     }
 
     @Override
@@ -67,7 +57,8 @@ final class JSONNotificationFormatter extends NotificationFormatter {
                     .name(NOTIFICATION_NAME).beginObject()
                         .name("event-time").value(toRFC3339(now));
                 writeNotificationBody(JSONNormalizedNodeStreamWriter.createNestedWriter(
-                    codecSupplier.getShared(schemaContext), input.getType(), null, jsonWriter), input.getBody());
+                    JSONCodecFactorySupplier.RFC7951.getShared(schemaContext), input.getType(), null, jsonWriter),
+                    input.getBody());
                 jsonWriter.endObject().endObject();
             }
             return writer.toString();
index 283fe5ca346f208c136636dd09776eb665b434e7..1146ad7230d6a1a7076e3743f6566e42ecca32bb 100644 (file)
@@ -7,23 +7,22 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
-import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.Objects.requireNonNull;
 
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.net.URI;
 import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.locks.StampedLock;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import javax.ws.rs.core.UriInfo;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
@@ -31,22 +30,19 @@ import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.common.errors.RestconfFuture;
-import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
-import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.QNameModule;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
@@ -55,11 +51,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
 import org.slf4j.Logger;
@@ -105,6 +99,22 @@ public abstract sealed class ListenersBroker {
         }
     }
 
+    /**
+     * Factory interface for creating instances of {@link AbstractStream}.
+     *
+     * @param <T> {@link AbstractStream} type
+     */
+    @FunctionalInterface
+    public interface StreamFactory<T extends AbstractStream<?>> {
+        /**
+         * Create a stream with the supplied name.
+         *
+         * @param name Stream name
+         * @return An {@link AbstractStream}
+         */
+        @NonNull T createStream(@NonNull String name);
+    }
+
     /**
      * Holder of all handlers for notifications.
      */
@@ -177,26 +187,15 @@ public abstract sealed class ListenersBroker {
 
     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
 
-    // Prefixes for stream names
-    private static final String DATA_SUBSCRIPTION = "data-change-event-subscription";
-    private static final String NOTIFICATION_STREAM = "notification-stream";
-    private static final String DEVICE_NOTIFICATION_STREAM = "device-notification-stream";
-
-    private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
-
-    private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
     private static final QName DATASTORE_QNAME =
-        QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
-    private static final QName SCOPE_QNAME =
-        QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
+        QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern();
     private static final QName OUTPUT_TYPE_QNAME =
-        QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
+        QName.create(NotificationOutputTypeGrouping.QNAME, "notification-output-type").intern();
     private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
-        QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
+        QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern();
     private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
         QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
     private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
-    private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
     private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
     private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
         NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
@@ -209,355 +208,74 @@ public abstract sealed class ListenersBroker {
     private static final NodeIdentifier STREAM_NAME_NODEID =
         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
 
-    private final StampedLock dataChangeListenersLock = new StampedLock();
-    private final StampedLock notificationListenersLock = new StampedLock();
-    private final StampedLock deviceNotificationListenersLock = new StampedLock();
-    private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
-    private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
-    private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
+    private final ConcurrentMap<String, AbstractStream<?>> streams = new ConcurrentHashMap<>();
 
     private ListenersBroker() {
         // Hidden on purpose
     }
 
     /**
-     * Gets {@link ListenerAdapter} specified by stream identification.
-     *
-     * @param streamName Stream name.
-     * @return {@link ListenerAdapter} specified by stream name or {@code null} if listener with specified stream name
-     *         does not exist.
-     * @throws NullPointerException in {@code streamName} is {@code null}
-     */
-    public final @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
-        requireNonNull(streamName);
-
-        final long stamp = dataChangeListenersLock.readLock();
-        try {
-            return dataChangeListeners.get(streamName);
-        } finally {
-            dataChangeListenersLock.unlockRead(stamp);
-        }
-    }
-
-    /**
-     * Gets {@link NotificationListenerAdapter} specified by stream name.
-     *
-     * @param streamName Stream name.
-     * @return {@link NotificationListenerAdapter} specified by stream name or {@code null} if listener with specified
-     *         stream name does not exist.
-     * @throws NullPointerException in {@code streamName} is {@code null}
-     */
-    public final @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
-        requireNonNull(streamName);
-
-        final long stamp = notificationListenersLock.readLock();
-        try {
-            return notificationListeners.get(streamName);
-        } finally {
-            notificationListenersLock.unlockRead(stamp);
-        }
-    }
-
-    /**
-     * Get listener for device path.
-     *
-     * @param streamName name.
-     * @return {@link DeviceNotificationListenerAdaptor} specified by stream name or {@code null} if listener with
-     *         specified stream name does not exist.
-     * @throws NullPointerException in {@code path} is {@code null}
-     */
-    public final @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
-        requireNonNull(streamName);
-
-        final long stamp = deviceNotificationListenersLock.readLock();
-        try {
-            return deviceNotificationListeners.get(streamName);
-        } finally {
-            deviceNotificationListenersLock.unlockRead(stamp);
-        }
-    }
-
-    /**
-     * Get listener for stream-name.
+     * Get an {@link AbstractStream} by its name.
      *
      * @param streamName Stream name.
-     * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
-     *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
-     */
-    public final @Nullable AbstractStream<?> listenerFor(final String streamName) {
-        if (streamName.startsWith(NOTIFICATION_STREAM)) {
-            return notificationListenerFor(streamName);
-        } else if (streamName.startsWith(DATA_SUBSCRIPTION)) {
-            return dataChangeListenerFor(streamName);
-        } else if (streamName.startsWith(DEVICE_NOTIFICATION_STREAM)) {
-            return deviceNotificationListenerFor(streamName);
-        } else {
-            return null;
-        }
-    }
-
-    /**
-     * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
-     * hasn't been created yet.
-     *
-     * @param path       Path to data in data repository.
-     * @param outputType Specific type of output for notifications - XML or JSON.
-     * @return Created or existing data-change listener adapter.
+     * @return An {@link AbstractStream}, or {@code null} if the stream with specified name does not exist.
+     * @throws NullPointerException if {@code streamName} is {@code null}
      */
-    public final ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
-            final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final Scope scope,
-            final NotificationOutputType outputType) {
-        final var sb = new StringBuilder(DATA_SUBSCRIPTION)
-            .append('/').append(createStreamNameFromUri(IdentifierCodec.serialize(path, modelContext)))
-            .append('/').append(RestconfStreamsConstants.DATASTORE_PARAM_NAME).append('=').append(datastore)
-            .append('/').append(RestconfStreamsConstants.SCOPE_PARAM_NAME).append('=').append(scope);
-        if (outputType != NotificationOutputType.XML) {
-            sb.append('/').append(outputType.getName());
-        }
-
-        final long stamp = dataChangeListenersLock.writeLock();
-        try {
-            return dataChangeListeners.computeIfAbsent(sb.toString(),
-                streamName -> new ListenerAdapter(streamName, outputType, this, datastore, path));
-        } finally {
-            dataChangeListenersLock.unlockWrite(stamp);
-        }
+    public final @Nullable AbstractStream<?> getStream(final String streamName) {
+        return streams.get(streamName);
     }
 
     /**
-     * Creates new {@link NotificationDefinition} listener using input stream name and schema path
-     * if such listener haven't been created yet.
+     * Create an {@link AbstractStream} with a unique name. This method will atomically generate a stream name, create
+     * the corresponding instance and register it
      *
-     * @param refSchemaCtx reference {@link EffectiveModelContext}
-     * @param notifications {@link QName}s of accepted YANG notifications
-     * @param outputType Specific type of output for notifications - XML or JSON.
-     * @return Created or existing notification listener adapter.
+     * @param <T> Stream type
+     * @param factory Factory for creating the actual stream instance
+     * @return An {@link AbstractStream} instance
+     * @throws NullPointerException if {@code factory} is {@code null}
      */
-    public final NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
-            final ImmutableSet<QName> notifications, final NotificationOutputType outputType) {
-        final var sb = new StringBuilder(NOTIFICATION_STREAM).append('/');
-        var haveFirst = false;
-        for (var qname : notifications) {
-            final var module = refSchemaCtx.findModuleStatement(qname.getModule())
-                .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
-                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
-            final var stmt = module.findSchemaTreeNode(qname)
-                .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
-                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
-            if (!(stmt instanceof NotificationEffectiveStatement)) {
-                throw new RestconfDocumentedException(qname + " refers to a non-notification",
-                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
-            }
+    public final <T extends AbstractStream<?>> @NonNull T createStream(final StreamFactory<T> factory) {
+        String name;
+        T stream;
+        do {
+            // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
+            // it into UUID URN namespace as defined by RFC4122
+            name = "urn:uuid:" + UUID.randomUUID().toString();
+            stream = factory.createStream(name);
+        } while (streams.putIfAbsent(name, stream) != null);
 
-            if (haveFirst) {
-                sb.append(',');
-            } else {
-                haveFirst = true;
-            }
-            sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
-        }
-        if (outputType != NotificationOutputType.XML) {
-            sb.append('/').append(outputType.getName());
-        }
-
-        final long stamp = notificationListenersLock.writeLock();
-        try {
-            return notificationListeners.computeIfAbsent(sb.toString(),
-                streamName -> new NotificationListenerAdapter(streamName, outputType, this, notifications));
-        } finally {
-            notificationListenersLock.unlockWrite(stamp);
-        }
+        return stream;
     }
 
     /**
-     * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
-     * if such listener haven't been created yet.
+     * Remove a particular stream and remove its entry from operational datastore.
      *
-     * @param deviceName Device name.
-     * @param outputType Specific type of output for notifications - XML or JSON.
-     * @param refSchemaCtx Schema context of node
-     * @param mountPointService Mount point service
-     * @return Created or existing device notification listener adapter.
-     */
-    private DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
-            final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
-            final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
-        final var sb = new StringBuilder(DEVICE_NOTIFICATION_STREAM).append('/')
-            .append(deviceName);
-
-        final long stamp = deviceNotificationListenersLock.writeLock();
-        try {
-            return deviceNotificationListeners.computeIfAbsent(sb.toString(),
-                streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, this, refSchemaCtx,
-                    mountPointService, path));
-        } finally {
-            deviceNotificationListenersLock.unlockWrite(stamp);
-        }
-    }
-
-    /**
-     * Removal and closing of all data-change-event and notification listeners.
-     */
-    public final synchronized void removeAndCloseAllListeners() {
-        final long stampNotifications = notificationListenersLock.writeLock();
-        final long stampDataChanges = dataChangeListenersLock.writeLock();
-        try {
-            removeAndCloseAllDataChangeListenersTemplate();
-            removeAndCloseAllNotificationListenersTemplate();
-        } finally {
-            dataChangeListenersLock.unlockWrite(stampDataChanges);
-            notificationListenersLock.unlockWrite(stampNotifications);
-        }
-    }
-
-    /**
-     * Closes and removes all data-change listeners.
-     */
-    public final void removeAndCloseAllDataChangeListeners() {
-        final long stamp = dataChangeListenersLock.writeLock();
-        try {
-            removeAndCloseAllDataChangeListenersTemplate();
-        } finally {
-            dataChangeListenersLock.unlockWrite(stamp);
-        }
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void removeAndCloseAllDataChangeListenersTemplate() {
-        dataChangeListeners.values().forEach(listenerAdapter -> {
-            try {
-                listenerAdapter.close();
-            } catch (Exception e) {
-                LOG.error("Failed to close data-change listener {}.", listenerAdapter, e);
-                throw new IllegalStateException("Failed to close data-change listener %s.".formatted(listenerAdapter),
-                    e);
-            }
-        });
-        dataChangeListeners.clear();
-    }
-
-    /**
-     * Closes and removes all notification listeners.
+     * @param stream Stream to remove
      */
-    public final void removeAndCloseAllNotificationListeners() {
-        final long stamp = notificationListenersLock.writeLock();
-        try {
-            removeAndCloseAllNotificationListenersTemplate();
-        } finally {
-            notificationListenersLock.unlockWrite(stamp);
-        }
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void removeAndCloseAllNotificationListenersTemplate() {
-        notificationListeners.values().forEach(listenerAdapter -> {
-            try {
-                listenerAdapter.close();
-            } catch (Exception e) {
-                LOG.error("Failed to close notification listener {}.", listenerAdapter, e);
-                throw new IllegalStateException("Failed to close notification listener %s.".formatted(listenerAdapter),
-                    e);
+    final void removeStream(final DOMDataBroker dataBroker, final AbstractStream<?> stream) {
+        // Defensive check to see if we are still tracking the stream
+        final var streamName = stream.getStreamName();
+        if (streams.get(streamName) != stream) {
+            LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", streamName, stream);
+            return;
+        }
+
+        // Now issue a delete operation while the name is still protected by being associated in the map.
+        final var tx = dataBroker.newWriteOnlyTransaction();
+        tx.delete(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(streamName));
+        tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+            @Override
+            public void onSuccess(final CommitInfo result) {
+                LOG.debug("Stream {} removed", streamName);
+                streams.remove(streamName, stream);
             }
-        });
-        notificationListeners.clear();
-    }
-
-    /**
-     * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
-     *
-     * @param listener Listener to be closed and removed.
-     */
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public final void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
-        final long stamp = dataChangeListenersLock.writeLock();
-        try {
-            removeAndCloseDataChangeListenerTemplate(listener);
-        } catch (Exception exception) {
-            LOG.error("Data-change listener {} cannot be closed.", listener, exception);
-        } finally {
-            dataChangeListenersLock.unlockWrite(stamp);
-        }
-    }
 
-    /**
-     * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
-     *
-     * @param listener Listener to be closed and removed.
-     */
-    private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
-        try {
-            requireNonNull(listener).close();
-            if (dataChangeListeners.inverse().remove(listener) == null) {
-                LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
+            @Override
+            public void onFailure(final Throwable cause) {
+                LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", streamName, cause);
+                streams.remove(streamName, stream);
             }
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Data-change listener {} cannot be closed.", listener, e);
-            throw new IllegalStateException("Data-change listener %s cannot be closed.".formatted(listener), e);
-        }
-    }
-
-    /**
-     * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
-     *
-     * @param listener Listener to be closed and removed.
-     */
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public final void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
-        final long stamp = notificationListenersLock.writeLock();
-        try {
-            removeAndCloseNotificationListenerTemplate(listener);
-        } catch (Exception e) {
-            LOG.error("Notification listener {} cannot be closed.", listener, e);
-        } finally {
-            notificationListenersLock.unlockWrite(stamp);
-        }
-    }
-
-    /**
-     * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
-     * specified in parameter.
-     *
-     * @param listener Listener to be closed and removed.
-     */
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public final void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
-        final long stamp = deviceNotificationListenersLock.writeLock();
-        try {
-            requireNonNull(listener);
-            if (deviceNotificationListeners.inverse().remove(listener) == null) {
-                LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
-            }
-        } catch (final Exception exception) {
-            LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
-        } finally {
-            deviceNotificationListenersLock.unlockWrite(stamp);
-        }
-    }
-
-    private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
-        try {
-            requireNonNull(listener).close();
-            if (notificationListeners.inverse().remove(listener) == null) {
-                LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
-            }
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Notification listener {} cannot be closed.", listener, e);
-            throw new IllegalStateException("Notification listener %s cannot be closed.".formatted(listener), e);
-        }
-    }
-
-    /**
-     * Removal and closing of general listener (data-change or notification listener).
-     *
-     * @param stream Stream to be closed and removed from cache.
-     */
-    final void removeAndCloseListener(final AbstractStream<?> stream) {
-        requireNonNull(stream);
-        if (stream instanceof ListenerAdapter dataChange) {
-            removeAndCloseDataChangeListener(dataChange);
-        } else if (stream instanceof NotificationListenerAdapter notification) {
-            removeAndCloseNotificationListener(notification);
-        }
+        }, MoreExecutors.directExecutor());
     }
 
     /**
@@ -567,22 +285,22 @@ public abstract sealed class ListenersBroker {
      * @param uri URI for creation of stream name.
      * @return String representation of stream name.
      */
-    private static String createStreamNameFromUri(final String uri) {
-        String result = requireNonNull(uri);
-        while (true) {
-            if (result.startsWith(URLConstants.BASE_PATH)) {
-                result = result.substring(URLConstants.BASE_PATH.length());
-            } else if (result.startsWith("/")) {
-                result = result.substring(1);
-            } else {
-                break;
-            }
-        }
-        if (result.endsWith("/")) {
-            result = result.substring(0, result.length() - 1);
-        }
-        return result;
-    }
+//    private static String createStreamNameFromUri(final String uri) {
+//        String result = requireNonNull(uri);
+//        while (true) {
+//            if (result.startsWith(URLConstants.BASE_PATH)) {
+//                result = result.substring(URLConstants.BASE_PATH.length());
+//            } else if (result.startsWith("/")) {
+//                result = result.substring(1);
+//            } else {
+//                break;
+//            }
+//        }
+//        if (result.endsWith("/")) {
+//            result = result.substring(0, result.length() - 1);
+//        }
+//        return result;
+//    }
 
     /**
      * Prepare URL from base name and stream name.
@@ -593,97 +311,20 @@ public abstract sealed class ListenersBroker {
      */
     public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
 
-    /**
-     * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
-     * about listener to DS according to ietf-restconf-monitoring.
-     *
-     * @param identifier              Name of the stream.
-     * @param uriInfo                 URI information.
-     * @param notificationQueryParams Query parameters of notification.
-     * @param handlersHolder          Holder of handlers for notifications.
-     * @return Stream location for listening.
-     */
-    public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
-            final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
-        final String streamName = createStreamNameFromUri(identifier);
-        if (isNullOrEmpty(streamName)) {
-            throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
-        }
-
-        final var notificationListenerAdapter = notificationListenerFor(streamName);
-        if (notificationListenerAdapter == null) {
-            throw new RestconfDocumentedException("Stream with name %s was not found.".formatted(streamName),
-                ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
-        }
-
-        final URI uri = prepareUriByStreamName(uriInfo, streamName);
-        notificationListenerAdapter.setQueryParams(notificationQueryParams);
-        notificationListenerAdapter.listen(handlersHolder.notificationService());
-        final DOMDataBroker dataBroker = handlersHolder.dataBroker();
-        notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
-        final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
-            notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
-
-        // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
-        final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
-        writeDataToDS(writeTransaction, mapToStreams);
-        submitData(writeTransaction);
-        return uri;
-    }
-
-    /**
-     * Register listener by streamName in identifier to listen to data change notifications, and put or delete
-     * information about listener to DS according to ietf-restconf-monitoring.
-     *
-     * @param identifier              Identifier as stream name.
-     * @param uriInfo                 Base URI information.
-     * @param notificationQueryParams Query parameters of notification.
-     * @param handlersHolder          Holder of handlers for notifications.
-     * @return Location for listening.
-     */
-    public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
-            final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
-        final var streamName = createStreamNameFromUri(identifier);
-        final var listener = dataChangeListenerFor(streamName);
-        if (listener == null) {
-            throw new RestconfDocumentedException("No listener found for stream " + streamName,
-                ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
-        }
-
-        listener.setQueryParams(notificationQueryParams);
-
-        final var dataBroker = handlersHolder.dataBroker();
-        final var schemaHandler = handlersHolder.databindProvider();
-        listener.setCloseVars(dataBroker, schemaHandler);
-        listener.listen(dataBroker);
-
-        final var uri = prepareUriByStreamName(uriInfo, streamName);
-        final var schemaContext = schemaHandler.currentContext().modelContext();
-        final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
-
-        final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
-                listener.getOutputType(), uri, schemaContext, serializedPath);
-        final var writeTransaction = dataBroker.newWriteOnlyTransaction();
-        writeDataToDS(writeTransaction, mapToStreams);
-        submitData(writeTransaction);
-        return uri;
-    }
-
     // FIXME: callers are utter duplicates, refactor them
-    private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
-        // FIXME: use put() here
-        tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
-            mapToStreams);
-    }
-
-    private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
-        try {
-            readWriteTransaction.commit().get();
-        } catch (final InterruptedException | ExecutionException e) {
-            throw new RestconfDocumentedException("Problem while putting data to DS.", e);
-        }
-    }
-
+//    private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
+//        // FIXME: use put() here
+//        tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
+//            mapToStreams);
+//    }
+//
+//    private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
+//        try {
+//            readWriteTransaction.commit().get();
+//        } catch (final InterruptedException | ExecutionException e) {
+//            throw new RestconfDocumentedException("Problem while putting data to DS.", e);
+//        }
+//    }
 
     /**
      * Create data-change-event stream with POST operation via RPC.
@@ -716,10 +357,11 @@ public abstract sealed class ListenersBroker {
     public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(final ContainerNode input,
             final EffectiveModelContext modelContext) {
         final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
-        final var scopeName = extractStringLeaf(input, SCOPE_NODEID);
-        final var adapter = registerDataChangeListener(modelContext,
-            datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName) : LogicalDatastoreType.CONFIGURATION,
-            preparePath(input), scopeName != null ? Scope.ofName(scopeName) : Scope.BASE, prepareOutputType(input));
+        final var datastore = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
+            : LogicalDatastoreType.CONFIGURATION;
+        final var path = preparePath(input);
+        final var outputType = prepareOutputType(input);
+        final var adapter = createStream(name -> new ListenerAdapter(name, outputType, this, datastore, path));
 
         // building of output
         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
@@ -728,6 +370,44 @@ public abstract sealed class ListenersBroker {
             .build()));
     }
 
+//    /**
+//     * Register listener by streamName in identifier to listen to data change notifications, and put or delete
+//     * information about listener to DS according to ietf-restconf-monitoring.
+//     *
+//     * @param identifier              Identifier as stream name.
+//     * @param uriInfo                 Base URI information.
+//     * @param notificationQueryParams Query parameters of notification.
+//     * @param handlersHolder          Holder of handlers for notifications.
+//     * @return Location for listening.
+//     */
+//    public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
+//            final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
+//        final var streamName = createStreamNameFromUri(identifier);
+//        final var listener = dataChangeListenerFor(streamName);
+//        if (listener == null) {
+//            throw new RestconfDocumentedException("No listener found for stream " + streamName,
+//                ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
+//        }
+//
+//        listener.setQueryParams(notificationQueryParams);
+//
+//        final var dataBroker = handlersHolder.dataBroker();
+//        final var schemaHandler = handlersHolder.databindProvider();
+//        listener.setCloseVars(dataBroker, schemaHandler);
+//        listener.listen(dataBroker);
+//
+//        final var uri = prepareUriByStreamName(uriInfo, streamName);
+//        final var schemaContext = schemaHandler.currentContext().modelContext();
+//        final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
+//
+//        final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
+//                listener.getOutputType(), uri, schemaContext, serializedPath);
+//        final var writeTransaction = dataBroker.newWriteOnlyTransaction();
+//        writeDataToDS(writeTransaction, mapToStreams);
+//        submitData(writeTransaction);
+//        return uri;
+//    }
+
     // FIXME: this really should be a normal RPC implementation
     public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(final ContainerNode input,
             final EffectiveModelContext modelContext) {
@@ -744,8 +424,28 @@ public abstract sealed class ListenersBroker {
             }
         }
 
+// FIXME: use this block to create a stream description
+//        final var module = refSchemaCtx.findModuleStatement(qname.getModule())
+//            .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
+//                ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+//        final var stmt = module.findSchemaTreeNode(qname)
+//            .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
+//                ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+//        if (!(stmt instanceof NotificationEffectiveStatement)) {
+//            throw new RestconfDocumentedException(qname + " refers to a non-notification",
+//                ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
+//        }
+//
+//        if (haveFirst) {
+//            sb.append(',');
+//        } else {
+//            haveFirst = true;
+//        }
+//        sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
+
         // registration of the listener
-        final var adapter = registerNotificationListener(modelContext, qnames, prepareOutputType(input));
+        final var outputType = prepareOutputType(input);
+        final var adapter = createStream(name -> new NotificationListenerAdapter(name, outputType, this, qnames));
 
         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
@@ -753,6 +453,44 @@ public abstract sealed class ListenersBroker {
             .build()));
     }
 
+    /**
+     * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
+     * about listener to DS according to ietf-restconf-monitoring.
+     *
+     * @param identifier              Name of the stream.
+     * @param uriInfo                 URI information.
+     * @param notificationQueryParams Query parameters of notification.
+     * @param handlersHolder          Holder of handlers for notifications.
+     * @return Stream location for listening.
+     */
+//    public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
+//            final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
+//        final String streamName = createStreamNameFromUri(identifier);
+//        if (isNullOrEmpty(streamName)) {
+//            throw new RestconfDocumentedException("Stream name is empty", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+//        }
+//
+//        final var notificationListenerAdapter = notificationListenerFor(streamName);
+//        if (notificationListenerAdapter == null) {
+//            throw new RestconfDocumentedException("Stream with name %s was not found".formatted(streamName),
+//                ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
+//        }
+//
+//        final URI uri = prepareUriByStreamName(uriInfo, streamName);
+//        notificationListenerAdapter.setQueryParams(notificationQueryParams);
+//        notificationListenerAdapter.listen(handlersHolder.notificationService());
+//        final DOMDataBroker dataBroker = handlersHolder.dataBroker();
+//        notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
+//        final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
+//            notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
+//
+//        // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
+//        final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
+//        writeDataToDS(writeTransaction, mapToStreams);
+//        submitData(writeTransaction);
+//        return uri;
+//    }
+
     /**
      * Create device notification stream.
      *
@@ -762,7 +500,7 @@ public abstract sealed class ListenersBroker {
      * @return {@link DOMRpcResult} - Output of RPC - example in JSON
      */
     // FIXME: this should be an RPC invocation
-    public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationListener(final ContainerNode input,
+    public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final ContainerNode input,
             final String baseUrl, final DOMMountPointService mountPointService) {
         // parsing out of container with settings and path
         // FIXME: ugly cast
@@ -779,7 +517,6 @@ public abstract sealed class ListenersBroker {
             throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
                 ErrorTag.INVALID_VALUE);
         }
-        final String deviceName = listId.values().iterator().next().toString();
 
         final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
             .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
@@ -802,8 +539,13 @@ public abstract sealed class ListenersBroker {
                 ErrorTag.OPERATION_FAILED);
         }
 
-        final var notificationListenerAdapter = registerDeviceNotificationListener(deviceName,
-            prepareOutputType(input), mountModelContext, mountPointService, mountPoint.getIdentifier());
+// FIXME: use this for description?
+//        final String deviceName = listId.values().iterator().next().toString();
+
+        final var outputType = prepareOutputType(input);
+        final var notificationListenerAdapter = createStream(
+            streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, this, mountModelContext,
+                mountPointService, mountPoint.getIdentifier()));
         notificationListenerAdapter.listen(mountNotifService, notificationPaths);
 
         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStreamsConstants.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStreamsConstants.java
deleted file mode 100644 (file)
index beb4d9e..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams;
-
-/**
- * Constants for streams.
- */
-public final class RestconfStreamsConstants {
-    public static final String DATASTORE_PARAM_NAME = "datastore";
-    public static final String SCOPE_PARAM_NAME = "scope";
-
-    private RestconfStreamsConstants() {
-        // Hidden on purpose
-    }
-}
index cab53b166a9e46d703c3d4dca917f9c4988e2e92..d1c85ce7fc58555390334ec3b462b47780ede3e7 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Strings;
 import java.util.concurrent.ScheduledExecutorService;
@@ -76,11 +77,17 @@ public final class SSESessionHandler implements StreamSessionHandler {
     /**
      * Handling of SSE session close event. Removal of subscription at listener and stopping of the ping process.
      */
-    public synchronized void close() {
+    @VisibleForTesting
+    synchronized void close() {
         listener.removeSubscriber(this);
         stopPingProcess();
     }
 
+    @Override
+    public synchronized boolean isConnected() {
+        return !sink.isClosed();
+    }
+
     /**
      * Sending of string message to outbound Server-Sent Events channel {@link SseEventSink}. SSE is automatically split
      * to fragments with new line character. If the maximum fragment length is set to non-zero positive value and input
@@ -104,6 +111,12 @@ public final class SSESessionHandler implements StreamSessionHandler {
         }
     }
 
+    @Override
+    public synchronized void endOfStream() {
+        stopPingProcess();
+        sink.close();
+    }
+
     /**
      * Split message to fragments. SSE automatically fragment string with new line character.
      * For manual fragmentation we will remove all new line characters
@@ -136,11 +149,6 @@ public final class SSESessionHandler implements StreamSessionHandler {
         }
     }
 
-    @Override
-    public synchronized boolean isConnected() {
-        return !sink.isClosed();
-    }
-
     // TODO:return some type of identification of connection
     @Override
     public String toString() {
index 7cd00423a2c312bb36ed4e3c13a36ac9e628fa2f..5f588365fa9f72ae007fa9eba5d80474384a98b2 100644 (file)
@@ -22,4 +22,9 @@ public interface StreamSessionHandler {
      * @param data Message data to be send.
      */
     void sendDataMessage(String data);
+
+    /**
+     * Called when the stream has reached its end. The handler should close all underlying resources.
+     */
+    void endOfStream();
 }
index dc8bf337edef067bd81072489e5f7dee99024eaa..c09e4370e3a87d36c6f857970a953ce2fa243953 100644 (file)
@@ -55,7 +55,7 @@ record WebSocketFactory(
         final var path = req.getRequestURI().getPath();
         if (path.startsWith(STREAMS_PREFIX)) {
             final var streamName = path.substring(STREAMS_PREFIX.length());
-            final var listener = listenersBroker.listenerFor(streamName);
+            final var listener = listenersBroker.getStream(streamName);
             if (listener != null) {
                 LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created",
                     streamName);
index 700624b823c2605501336779251daf6320c975e0..6ce653141e6601eb950594a0a52bb9cc7bc92956 100644 (file)
@@ -144,6 +144,14 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
         }
     }
 
+    @Override
+    public synchronized void endOfStream() {
+        if (session != null && session.isOpen()) {
+            session.close();
+        }
+        stopPingProcess();
+    }
+
     /**
      * Sensing of string message to remote endpoint of {@link org.eclipse.jetty.websocket.api.Session}. If the maximum
      * fragment length is set to non-zero positive value and input message exceeds this value, message is fragmented
@@ -159,7 +167,7 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
         }
 
         if (session != null && session.isOpen()) {
-            final RemoteEndpoint remoteEndpoint = session.getRemote();
+            final var remoteEndpoint = session.getRemote();
             if (maximumFragmentLength == 0 || message.length() <= maximumFragmentLength) {
                 sendDataMessage(message, remoteEndpoint);
             } else {
@@ -206,7 +214,7 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
     }
 
     private static List<String> splitMessageToFragments(final String inputMessage, final int maximumFragmentLength) {
-        final List<String> parts = new ArrayList<>();
+        final var parts = new ArrayList<String>();
         int length = inputMessage.length();
         for (int i = 0; i < length; i += maximumFragmentLength) {
             parts.add(inputMessage.substring(i, Math.min(length, i + maximumFragmentLength)));
index 15db6deab1fd3fe399aa001ce219f2a558b9773f..30ca670ddbcaccde611e3cb5a8c5574bc469cf29 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.time.Instant;
@@ -18,8 +19,8 @@ import org.opendaylight.yangtools.yang.data.codec.xml.XMLStreamNormalizedNodeStr
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 
 final class XMLNotificationFormatter extends NotificationFormatter {
-    private static final XMLNotificationFormatter EMPTY = new XMLNotificationFormatter(TextParameters.EMPTY);
-
+    @VisibleForTesting
+    static final XMLNotificationFormatter EMPTY = new XMLNotificationFormatter(TextParameters.EMPTY);
     static final NotificationFormatterFactory FACTORY = new NotificationFormatterFactory(EMPTY) {
         @Override
         XMLNotificationFormatter newFormatter(final TextParameters textParams) {
@@ -7,18 +7,16 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.ImmutableSet;
 import java.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
@@ -29,30 +27,23 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class JsonNotificationListenerTest extends AbstractNotificationListenerTest {
-    private static final Logger LOG = LoggerFactory.getLogger(JsonNotificationListenerTest.class);
-
-    private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
+@ExtendWith(MockitoExtension.class)
+class JSONNotificationFormatterTest extends AbstractNotificationListenerTest {
+    @Mock
+    private DOMNotification notificationData;
 
     @Test
-    public void notifi_leafTest() throws Exception {
+    void notifi_leafTest() throws Exception {
         final QName schemaPathNotifi = QName.create(MODULE, "notifi-leaf");
 
-        final DOMNotification notificationData = mock(DOMNotification.class);
-
-        final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
-        final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+        final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+        final var notifiBody = mockCont(schemaPathNotifi, leaf);
 
         when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
         when(notificationData.getBody()).thenReturn(notifiBody);
 
-        final String result = prepareJson(notificationData, schemaPathNotifi);
-
-        LOG.info("json result: {}", result);
+        final String result = prepareJson(schemaPathNotifi);
 
         assertTrue(result.contains("ietf-restconf:notification"));
         assertTrue(result.contains("event-time"));
@@ -61,19 +52,17 @@ public class JsonNotificationListenerTest extends AbstractNotificationListenerTe
     }
 
     @Test
-    public void notifi_cont_leafTest() throws Exception {
+    void notifi_cont_leafTest() throws Exception {
         final QName schemaPathNotifi = QName.create(MODULE, "notifi-cont");
 
-        final DOMNotification notificationData = mock(DOMNotification.class);
-
-        final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
-        final ContainerNode cont = mockCont(QName.create(MODULE, "cont"), leaf);
-        final ContainerNode notifiBody = mockCont(schemaPathNotifi, cont);
+        final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+        final var cont = mockCont(QName.create(MODULE, "cont"), leaf);
+        final var notifiBody = mockCont(schemaPathNotifi, cont);
 
         when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
         when(notificationData.getBody()).thenReturn(notifiBody);
 
-        final String result = prepareJson(notificationData, schemaPathNotifi);
+        final String result = prepareJson(schemaPathNotifi);
 
         assertTrue(result.contains("ietf-restconf:notification"));
         assertTrue(result.contains("event-time"));
@@ -83,14 +72,12 @@ public class JsonNotificationListenerTest extends AbstractNotificationListenerTe
     }
 
     @Test
-    public void notifi_list_Test() throws Exception {
+    void notifi_list_Test() throws Exception {
         final QName schemaPathNotifi = QName.create(MODULE, "notifi-list");
 
-        final DOMNotification notificationData = mock(DOMNotification.class);
-
-        final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
-        final MapEntryNode entry = mockMapEntry(QName.create(MODULE, "lst"), leaf);
-        final ContainerNode notifiBody = mockCont(schemaPathNotifi, Builders.mapBuilder()
+        final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+        final var entry = mockMapEntry(QName.create(MODULE, "lst"), leaf);
+        final var notifiBody = mockCont(schemaPathNotifi, Builders.mapBuilder()
             .withNodeIdentifier(NodeIdentifier.create(QName.create(MODULE, "lst")))
             .withChild(entry)
             .build());
@@ -98,7 +85,7 @@ public class JsonNotificationListenerTest extends AbstractNotificationListenerTe
         when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
         when(notificationData.getBody()).thenReturn(notifiBody);
 
-        final String result = prepareJson(notificationData, schemaPathNotifi);
+        final String result = prepareJson(schemaPathNotifi);
 
         assertTrue(result.contains("ietf-restconf:notification"));
         assertTrue(result.contains("event-time"));
@@ -108,18 +95,16 @@ public class JsonNotificationListenerTest extends AbstractNotificationListenerTe
     }
 
     @Test
-    public void notifi_grpTest() throws Exception {
+    void notifi_grpTest() throws Exception {
         final QName schemaPathNotifi = QName.create(MODULE, "notifi-grp");
 
-        final DOMNotification notificationData = mock(DOMNotification.class);
-
-        final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
-        final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+        final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+        final var notifiBody = mockCont(schemaPathNotifi, leaf);
 
         when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
         when(notificationData.getBody()).thenReturn(notifiBody);
 
-        final String result = prepareJson(notificationData, schemaPathNotifi);
+        final String result = prepareJson(schemaPathNotifi);
 
         assertTrue(result.contains("ietf-restconf:notification"));
         assertTrue(result.contains("event-time"));
@@ -127,18 +112,16 @@ public class JsonNotificationListenerTest extends AbstractNotificationListenerTe
     }
 
     @Test
-    public void notifi_augmTest() throws Exception {
+    void notifi_augmTest() throws Exception {
         final QName schemaPathNotifi = QName.create(MODULE, "notifi-augm");
 
-        final DOMNotification notificationData = mock(DOMNotification.class);
-
-        final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf-augm"));
-        final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+        final var leaf = mockLeaf(QName.create(MODULE, "lf-augm"));
+        final var notifiBody = mockCont(schemaPathNotifi, leaf);
 
         when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
         when(notificationData.getBody()).thenReturn(notifiBody);
 
-        final String result = prepareJson(notificationData, schemaPathNotifi);
+        final String result = prepareJson(schemaPathNotifi);
 
         assertTrue(result.contains("ietf-restconf:notification"));
         assertTrue(result.contains("event-time"));
@@ -163,10 +146,8 @@ public class JsonNotificationListenerTest extends AbstractNotificationListenerTe
         return ImmutableNodes.leafNode(leafQName, "value");
     }
 
-    private String prepareJson(final DOMNotification notificationData, final QName schemaPathNotifi)
-            throws Exception {
-        final var ret = listenersBroker.registerNotificationListener(MODEL_CONTEXT, ImmutableSet.of(schemaPathNotifi),
-            NotificationOutputType.JSON).formatter().eventData(MODEL_CONTEXT, notificationData, Instant.now());
+    private String prepareJson(final QName schemaPathNotifi) throws Exception {
+        final var ret = JSONNotificationFormatter.EMPTY.eventData(MODEL_CONTEXT, notificationData, Instant.now());
         assertNotNull(ret);
         return ret;
     }
index d87f8542106cbe1c974d1707cfc831e66218c002..ae3459db2899c3adb6c343d438233df505db84e2 100644 (file)
@@ -7,59 +7,61 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.startsWith;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
-import java.util.function.Function;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.ContainerLike;
-import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
 
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class ListenersBrokerTest {
-    private static EffectiveModelContext SCHEMA_CTX;
+@ExtendWith(MockitoExtension.class)
+class ListenersBrokerTest {
+    private static final EffectiveModelContext SCHEMA_CTX = YangParserTestUtils.parseYangResourceDirectory("/streams");
 
     private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
 
-    @BeforeClass
-    public static void setUp() {
-        SCHEMA_CTX = YangParserTestUtils.parseYangResourceDirectory("/streams");
-    }
-
     @Test
-    public void createStreamTest() {
-        assertEquals(prepareDomPayload("create-data-change-event-subscription",
-            RpcDefinition::getOutput,
-            "data-change-event-subscription/toaster:toaster/datastore=CONFIGURATION/scope=BASE", "stream-name"),
-            listenersBroker.createDataChangeNotifiStream(
-                prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput, "toaster", "path"),
-                SCHEMA_CTX).getOrThrow().orElseThrow());
+    void createStreamTest() {
+        final var output = assertInstanceOf(ContainerNode.class, listenersBroker.createDataChangeNotifiStream(
+            prepareDomPayload("create-data-change-event-subscription", "toaster", "path"),
+            SCHEMA_CTX).getOrThrow().orElse(null));
+
+        assertEquals(new NodeIdentifier(CreateDataChangeEventSubscriptionOutput.QNAME), output.name());
+        assertEquals(1, output.size());
+
+        final var streamName = assertInstanceOf(LeafNode.class,
+            output.childByArg(new NodeIdentifier(
+                QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name"))));
+        final var name = assertInstanceOf(String.class, streamName.body());
+        assertEquals(45, name.length());
+        assertThat(name, startsWith("urn:uuid:"));
+        assertNotNull(UUID.fromString(name.substring(9)));
     }
 
     @Test
-    public void createStreamWrongValueTest() {
-        final var payload = prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput,
-            "String value", "path");
+    void createStreamWrongValueTest() {
+        final var payload = prepareDomPayload("create-data-change-event-subscription", "String value", "path");
         final var errors = assertThrows(RestconfDocumentedException.class,
             () -> listenersBroker.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors();
         assertEquals(1, errors.size());
@@ -70,9 +72,8 @@ public class ListenersBrokerTest {
     }
 
     @Test
-    public void createStreamWrongInputRpcTest() {
-        final var payload = prepareDomPayload("create-data-change-event-subscription2", RpcDefinition::getInput,
-            "toaster", "path2");
+    void createStreamWrongInputRpcTest() {
+        final var payload = prepareDomPayload("create-data-change-event-subscription2", "toaster", "path2");
         final var errors = assertThrows(RestconfDocumentedException.class,
             () -> listenersBroker.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors();
         assertEquals(1, errors.size());
@@ -82,24 +83,22 @@ public class ListenersBrokerTest {
         assertEquals("Instance identifier was not normalized correctly", error.getErrorMessage());
     }
 
-    private static ContainerNode prepareDomPayload(final String rpcName,
-            final Function<RpcDefinition, ContainerLike> rpcToContainer, final String toasterValue,
+    private static ContainerNode prepareDomPayload(final String rpcName, final String toasterValue,
             final String inputOutputName) {
-        final Module rpcModule = SCHEMA_CTX.findModules("sal-remote").iterator().next();
+        final var rpcModule = SCHEMA_CTX.findModules("sal-remote").iterator().next();
         final QName rpcQName = QName.create(rpcModule.getQNameModule(), rpcName);
 
         ContainerLike containerSchemaNode = null;
         for (final RpcDefinition rpc : rpcModule.getRpcs()) {
             if (rpcQName.isEqualWithoutRevision(rpc.getQName())) {
-                containerSchemaNode = rpcToContainer.apply(rpc);
+                containerSchemaNode = rpc.getInput();
                 break;
             }
         }
         assertNotNull(containerSchemaNode);
 
         final QName lfQName = QName.create(rpcModule.getQNameModule(), inputOutputName);
-        final DataSchemaNode lfSchemaNode = containerSchemaNode.getDataChildByName(lfQName);
-        assertThat(lfSchemaNode, instanceOf(LeafSchemaNode.class));
+        assertInstanceOf(LeafSchemaNode.class, containerSchemaNode.dataChildByName(lfQName));
 
         final Object o;
         if ("toaster".equals(toasterValue)) {
index 813d1334914643852a9e31901d3e0993990a979f..373bee8ce1e4ce39f0380d4530d7787362d10d19 100644 (file)
@@ -22,16 +22,12 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 @ExtendWith(MockitoExtension.class)
 class WebSocketFactoryTest extends AbstractNotificationListenerTest {
-    private static final String REGISTERED_STREAM_NAME = "data-change-event-subscription/"
-            + "toaster:toaster/datastore=CONFIGURATION/scope=SUBTREE/JSON";
-
     private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
 
     @Mock
@@ -42,19 +38,21 @@ class WebSocketFactoryTest extends AbstractNotificationListenerTest {
     private ServletUpgradeResponse upgradeResponse;
 
     private WebSocketFactory webSocketFactory;
+    private String streamName;
 
     @BeforeEach
     void prepareListenersBroker() {
         webSocketFactory = new WebSocketFactory(execService, listenersBroker, 5000, 2000);
 
-        listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.CONFIGURATION,
-            YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
-            Scope.SUBTREE, NotificationOutputTypeGrouping.NotificationOutputType.JSON);
+        streamName = listenersBroker.createStream(name -> new ListenerAdapter(name, NotificationOutputType.JSON,
+            listenersBroker, LogicalDatastoreType.CONFIGURATION,
+            YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))))
+            .getStreamName();
     }
 
     @Test
     void createWebSocketSuccessfully() {
-        doReturn(URI.create("https://localhost:8181/rests/streams/" + REGISTERED_STREAM_NAME))
+        doReturn(URI.create("https://localhost:8181/rests/streams/" + streamName))
             .when(upgradeRequest).getRequestURI();
 
         assertInstanceOf(WebSocketSessionHandler.class,
@@ -65,7 +63,7 @@ class WebSocketFactoryTest extends AbstractNotificationListenerTest {
 
     @Test
     void createWebSocketUnsuccessfully() {
-        doReturn(URI.create("https://localhost:8181/rests/streams/" + REGISTERED_STREAM_NAME + "/toasterStatus"))
+        doReturn(URI.create("https://localhost:8181/rests/streams/" + streamName + "/toasterStatus"))
             .when(upgradeRequest).getRequestURI();
 
         assertNull(webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse));
@@ -7,17 +7,15 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.ImmutableSet;
 import java.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 import org.opendaylight.mdsal.dom.api.DOMNotification;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
@@ -30,56 +28,51 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
 import org.xmlunit.assertj.XmlAssert;
 
-@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class XmlNotificationListenerTest extends AbstractNotificationListenerTest {
-    private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
+@ExtendWith(MockitoExtension.class)
+class XMLNotificationFormatterTest extends AbstractNotificationListenerTest {
+    @Mock
+    private DOMNotification notificationData;
 
     @Test
-    public void notifi_leafTest() throws Exception {
+    void notifi_leafTest() throws Exception {
         final QName schemaPathNotifi = QName.create(MODULE, "notifi-leaf");
 
-        final DOMNotification notificationData = mock(DOMNotification.class);
-
-        final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
-        final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+        final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+        final var notifiBody = mockCont(schemaPathNotifi, leaf);
 
         when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
         when(notificationData.getBody()).thenReturn(notifiBody);
 
-        assertXmlMatches(prepareXmlResult(notificationData, schemaPathNotifi), """
+        assertXmlMatches(prepareXmlResult(schemaPathNotifi), """
             <notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">\
             <eventTime>2020-06-29T14:23:46.086855+02:00</eventTime><notifi-leaf xmlns="notifi:mod">\
             <lf>value</lf></notifi-leaf></notification>""");
     }
 
     @Test
-    public void notifi_cont_leafTest() throws Exception {
+    void notifi_cont_leafTest() throws Exception {
         final QName schemaPathNotifi = QName.create(MODULE, "notifi-cont");
 
-        final DOMNotification notificationData = mock(DOMNotification.class);
-
-        final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
-        final ContainerNode cont = mockCont(QName.create(MODULE, "cont"), leaf);
-        final ContainerNode notifiBody = mockCont(schemaPathNotifi, cont);
+        final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+        final var cont = mockCont(QName.create(MODULE, "cont"), leaf);
+        final var notifiBody = mockCont(schemaPathNotifi, cont);
 
         when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
         when(notificationData.getBody()).thenReturn(notifiBody);
 
-        assertXmlMatches(prepareXmlResult(notificationData, schemaPathNotifi), """
+        assertXmlMatches(prepareXmlResult(schemaPathNotifi), """
             <notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">\
             <eventTime>2020-06-29T14:23:46.086855+02:00</eventTime><notifi-cont xmlns="notifi:mod">\
             <cont><lf>value</lf></cont></notifi-cont></notification>""");
     }
 
     @Test
-    public void notifi_list_Test() throws Exception {
+    void notifi_list_Test() throws Exception {
         final QName schemaPathNotifi = QName.create(MODULE, "notifi-list");
 
-        final DOMNotification notificationData = mock(DOMNotification.class);
-
-        final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
-        final MapEntryNode entry = mockMapEntry(QName.create(MODULE, "lst"), leaf);
-        final ContainerNode notifiBody = mockCont(schemaPathNotifi, Builders.mapBuilder()
+        final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+        final var entry = mockMapEntry(QName.create(MODULE, "lst"), leaf);
+        final var notifiBody = mockCont(schemaPathNotifi, Builders.mapBuilder()
             .withNodeIdentifier(NodeIdentifier.create(QName.create(MODULE, "lst")))
             .withChild(entry)
             .build());
@@ -87,43 +80,39 @@ public class XmlNotificationListenerTest extends AbstractNotificationListenerTes
         when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
         when(notificationData.getBody()).thenReturn(notifiBody);
 
-        assertXmlMatches(prepareXmlResult(notificationData, schemaPathNotifi), """
+        assertXmlMatches(prepareXmlResult(schemaPathNotifi), """
             <notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">\
             <eventTime>2020-06-29T14:23:46.086855+02:00</eventTime><notifi-list xmlns="notifi:mod">\
             <lst><lf>value</lf></lst></notifi-list></notification>""");
     }
 
     @Test
-    public void notifi_grpTest() throws Exception {
+    void notifi_grpTest() throws Exception {
         final QName schemaPathNotifi = QName.create(MODULE, "notifi-grp");
 
-        final DOMNotification notificationData = mock(DOMNotification.class);
-
-        final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf"));
-        final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+        final var leaf = mockLeaf(QName.create(MODULE, "lf"));
+        final var notifiBody = mockCont(schemaPathNotifi, leaf);
 
         when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
         when(notificationData.getBody()).thenReturn(notifiBody);
 
-        assertXmlMatches(prepareXmlResult(notificationData, schemaPathNotifi), """
+        assertXmlMatches(prepareXmlResult(schemaPathNotifi), """
             <notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">\
             <eventTime>2020-06-29T14:23:46.086855+02:00</eventTime><notifi-grp xmlns="notifi:mod">\
             <lf>value</lf></notifi-grp></notification>""");
     }
 
     @Test
-    public void notifi_augmTest() throws Exception {
+    void notifi_augmTest() throws Exception {
         final QName schemaPathNotifi = QName.create(MODULE, "notifi-augm");
 
-        final DOMNotification notificationData = mock(DOMNotification.class);
-
-        final LeafNode<String> leaf = mockLeaf(QName.create(MODULE, "lf-augm"));
-        final ContainerNode notifiBody = mockCont(schemaPathNotifi, leaf);
+        final var leaf = mockLeaf(QName.create(MODULE, "lf-augm"));
+        final var notifiBody = mockCont(schemaPathNotifi, leaf);
 
         when(notificationData.getType()).thenReturn(Absolute.of(schemaPathNotifi));
         when(notificationData.getBody()).thenReturn(notifiBody);
 
-        assertXmlMatches(prepareXmlResult(notificationData, schemaPathNotifi), """
+        assertXmlMatches(prepareXmlResult(schemaPathNotifi), """
             <notification xmlns="urn:ietf:params:xml:ns:netconf:notification:1.0">\
             <eventTime>2020-06-29T14:23:46.086855+02:00</eventTime><notifi-augm xmlns="notifi:mod">\
             <lf-augm>value</lf-augm></notifi-augm></notification>""");
@@ -155,10 +144,8 @@ public class XmlNotificationListenerTest extends AbstractNotificationListenerTes
         return ImmutableNodes.leafNode(leafQName, "value");
     }
 
-    private String prepareXmlResult(final DOMNotification notificationData, final QName schemaPathNotifi)
-            throws Exception {
-        final var ret = listenersBroker.registerNotificationListener(MODEL_CONTEXT, ImmutableSet.of(schemaPathNotifi),
-            NotificationOutputType.XML).formatter().eventData(MODEL_CONTEXT, notificationData, Instant.now());
+    private String prepareXmlResult(final QName schemaPathNotifi) throws Exception {
+        final var ret = XMLNotificationFormatter.EMPTY.eventData(MODEL_CONTEXT, notificationData, Instant.now());
         assertNotNull(ret);
         return ret;
     }