Eliminate CreateStreamUtil 38/108838/5
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 3 Nov 2023 09:44:47 +0000 (10:44 +0100)
committerRobert Varga <nite@hq.sk>
Fri, 3 Nov 2023 15:04:33 +0000 (15:04 +0000)
Integrate the functionality into ListenerBroker, before we end up
splitting it out.

JIRA: NETCONF-1102
Change-Id: I57fafd28bc602d81e06780515cf66c6e26627013
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java [deleted file]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBroker.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBrokerTest.java [moved from restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtilTest.java with 91% similarity]

diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java
deleted file mode 100644 (file)
index d1b4791..0000000
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
-
-import com.google.common.collect.ImmutableSet;
-import java.util.Optional;
-import org.eclipse.jdt.annotation.Nullable;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMMountPoint;
-import org.opendaylight.mdsal.dom.api.DOMMountPointService;
-import org.opendaylight.mdsal.dom.api.DOMNotificationService;
-import org.opendaylight.mdsal.dom.api.DOMRpcResult;
-import org.opendaylight.mdsal.dom.api.DOMSchemaService;
-import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.common.errors.RestconfFuture;
-import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker;
-import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStreamsConstants;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.yang.common.ErrorTag;
-import org.opendaylight.yangtools.yang.common.ErrorType;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.QNameModule;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
-import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
-
-/**
- * Utility class for creation of data-change-event or YANG notification streams.
- */
-final class CreateStreamUtil {
-    private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
-
-    private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
-    private static final QName DATASTORE_QNAME =
-        QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
-    private static final QName SCOPE_QNAME =
-        QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
-    private static final QName OUTPUT_TYPE_QNAME =
-        QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
-    private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
-        QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
-    private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
-        QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
-    private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
-    private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
-    private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
-    private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
-        NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
-    private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
-        NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
-    private static final NodeIdentifier NOTIFICATIONS =
-        NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern());
-    private static final NodeIdentifier PATH_NODEID =
-        NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
-    private static final NodeIdentifier STREAM_NAME_NODEID =
-        NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
-
-    private CreateStreamUtil() {
-        // Hidden on purpose
-    }
-
-    /**
-     * Create data-change-event stream with POST operation via RPC.
-     *
-     * @param input Input of RPC - example in JSON (data-change-event stream):
-     *              <pre>
-     *              {@code
-     *                  {
-     *                      "input": {
-     *                          "path": "/toaster:toaster/toaster:toasterStatus",
-     *                          "sal-remote-augment:datastore": "OPERATIONAL",
-     *                          "sal-remote-augment:scope": "ONE"
-     *                      }
-     *                  }
-     *              }
-     *              </pre>
-     * @param modelContext Reference to {@link EffectiveModelContext}.
-     * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
-     *     <pre>
-     *     {@code
-     *         {
-     *             "output": {
-     *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
-     *             }
-     *         }
-     *     }
-     *     </pre>
-     */
-    // FIXME: this really should be a normal RPC implementation
-    static RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(final ListenersBroker listenersBroker,
-            final ContainerNode input, final EffectiveModelContext modelContext) {
-        final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
-        final var scopeName = extractStringLeaf(input, SCOPE_NODEID);
-        final var adapter = listenersBroker.registerDataChangeListener(modelContext,
-            datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName) : LogicalDatastoreType.CONFIGURATION,
-            preparePath(input), scopeName != null ? Scope.ofName(scopeName) : Scope.BASE, prepareOutputType(input));
-
-        // building of output
-        return RestconfFuture.of(Optional.of(Builders.containerBuilder()
-            .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
-            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
-            .build()));
-    }
-
-    // FIXME: this really should be a normal RPC implementation
-    static RestconfFuture<Optional<ContainerNode>> createNotificationStream(final ListenersBroker listenersBroker,
-            final ContainerNode input, final EffectiveModelContext modelContext) {
-        final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
-            .map(LeafSetEntryNode::body)
-            .map(QName::create)
-            .sorted()
-            .collect(ImmutableSet.toImmutableSet());
-
-        for (var qname : qnames) {
-            if (modelContext.findNotification(qname).isEmpty()) {
-                throw new RestconfDocumentedException(qname + " refers to an unknown notification",
-                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
-            }
-        }
-
-        // registration of the listener
-        final var adapter = listenersBroker.registerNotificationListener(modelContext, qnames,
-            prepareOutputType(input));
-
-        return RestconfFuture.of(Optional.of(Builders.containerBuilder()
-            .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
-            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
-            .build()));
-    }
-
-    /**
-     * Create device notification stream.
-     *
-     * @param baseUrl base Url
-     * @param input RPC input
-     * @param mountPointService dom mount point service
-     * @param listenersBroker {@link ListenersBroker}
-     * @return {@link DOMRpcResult} - Output of RPC - example in JSON
-     */
-    // FIXME: this should be an RPC invocation
-    static RestconfFuture<Optional<ContainerNode>> createDeviceNotificationListener(
-            final ListenersBroker listenersBroker, final ContainerNode input, final String baseUrl,
-            final DOMMountPointService mountPointService) {
-        // parsing out of container with settings and path
-        // FIXME: ugly cast
-        final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
-                .map(DataContainerChild::body)
-                .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
-                    ErrorTag.DATA_MISSING));
-
-        if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
-            throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
-                ErrorTag.INVALID_VALUE);
-        }
-        if (listId.size() != 1) {
-            throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
-                ErrorTag.INVALID_VALUE);
-        }
-        final String deviceName = listId.values().iterator().next().toString();
-
-        final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
-            .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
-                ErrorTag.OPERATION_FAILED));
-
-        final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
-            .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
-                ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
-
-        final var mountModelContext = mountPoint.getService(DOMSchemaService.class)
-            .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
-                ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
-            .getGlobalContext();
-        final var notificationPaths = mountModelContext.getModuleStatements().values().stream()
-            .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
-            .map(notification -> Absolute.of(notification.argument()))
-            .collect(ImmutableSet.toImmutableSet());
-        if (notificationPaths.isEmpty()) {
-            throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
-                ErrorTag.OPERATION_FAILED);
-        }
-
-        final var notificationListenerAdapter = listenersBroker.registerDeviceNotificationListener(deviceName,
-            prepareOutputType(input), mountModelContext, mountPointService, mountPoint.getIdentifier());
-        notificationListenerAdapter.listen(mountNotifService, notificationPaths);
-
-        return RestconfFuture.of(Optional.of(Builders.containerBuilder()
-            .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
-            .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
-                baseUrl + notificationListenerAdapter.getStreamName()))
-            .build()));
-    }
-
-    /**
-     * Prepare {@link NotificationOutputType}.
-     *
-     * @param data Container with stream settings (RPC create-stream).
-     * @return Parsed {@link NotificationOutputType}.
-     */
-    private static NotificationOutputType prepareOutputType(final ContainerNode data) {
-        final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
-        return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
-    }
-
-    /**
-     * Prepare {@link YangInstanceIdentifier} of stream source.
-     *
-     * @param data Container with stream settings (RPC create-stream).
-     * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
-     *         are going to be generated.
-     */
-    private static YangInstanceIdentifier preparePath(final ContainerNode data) {
-        final var pathLeaf = data.childByArg(PATH_NODEID);
-        if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
-            return pathValue;
-        }
-
-        throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
-            ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
-    }
-
-    private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
-        return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
-            ? str : null;
-    }
-}
index bfab58135c45149a2c1604c06c1a5ede6291b487..94c0898921816f630cab535caafe30b832cb11b1 100644 (file)
@@ -160,13 +160,11 @@ public final class RestconfInvokeOperationsServiceImpl {
         if (mountPoint == null) {
             // Hacked-up integration of streams
             if (CreateDataChangeEventSubscription.QNAME.equals(type)) {
-                return CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, input,
-                    localDatabind.modelContext());
+                return listenersBroker.createDataChangeNotifiStream(input, localDatabind.modelContext());
             } else if (CreateNotificationStream.QNAME.equals(type)) {
-                return CreateStreamUtil.createNotificationStream(listenersBroker, input,
-                    localDatabind.modelContext());
+                return listenersBroker.createNotificationStream(input, localDatabind.modelContext());
             } else if (SubscribeDeviceNotification.QNAME.equals(type)) {
-                return CreateStreamUtil.createDeviceNotificationListener(listenersBroker, input,
+                return listenersBroker.createDeviceNotificationListener(input,
                     listenersBroker.prepareUriByStreamName(uriInfo, "").toString(), mountPointService);
             }
         }
index a4bf15d92491afd80bef465de3b38d92741031cb..283fe5ca346f208c136636dd09776eb665b434e7 100644 (file)
@@ -24,23 +24,44 @@ import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMMountPoint;
 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.common.errors.RestconfFuture;
 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.QNameModule;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
+import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -161,6 +182,33 @@ public abstract sealed class ListenersBroker {
     private static final String NOTIFICATION_STREAM = "notification-stream";
     private static final String DEVICE_NOTIFICATION_STREAM = "device-notification-stream";
 
+    private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
+
+    private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
+    private static final QName DATASTORE_QNAME =
+        QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
+    private static final QName SCOPE_QNAME =
+        QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
+    private static final QName OUTPUT_TYPE_QNAME =
+        QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
+    private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
+        QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
+    private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
+        QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
+    private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
+    private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
+    private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
+    private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
+        NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
+    private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
+        NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
+    private static final NodeIdentifier NOTIFICATIONS =
+        NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern());
+    private static final NodeIdentifier PATH_NODEID =
+        NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
+    private static final NodeIdentifier STREAM_NAME_NODEID =
+        NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
+
     private final StampedLock dataChangeListenersLock = new StampedLock();
     private final StampedLock notificationListenersLock = new StampedLock();
     private final StampedLock deviceNotificationListenersLock = new StampedLock();
@@ -331,7 +379,7 @@ public abstract sealed class ListenersBroker {
      * @param mountPointService Mount point service
      * @return Created or existing device notification listener adapter.
      */
-    public final DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
+    private DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
             final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
             final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
         final var sb = new StringBuilder(DEVICE_NOTIFICATION_STREAM).append('/')
@@ -635,4 +683,166 @@ public abstract sealed class ListenersBroker {
             throw new RestconfDocumentedException("Problem while putting data to DS.", e);
         }
     }
+
+
+    /**
+     * Create data-change-event stream with POST operation via RPC.
+     *
+     * @param input Input of RPC - example in JSON (data-change-event stream):
+     *              <pre>
+     *              {@code
+     *                  {
+     *                      "input": {
+     *                          "path": "/toaster:toaster/toaster:toasterStatus",
+     *                          "sal-remote-augment:datastore": "OPERATIONAL",
+     *                          "sal-remote-augment:scope": "ONE"
+     *                      }
+     *                  }
+     *              }
+     *              </pre>
+     * @param modelContext Reference to {@link EffectiveModelContext}.
+     * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
+     *     <pre>
+     *     {@code
+     *         {
+     *             "output": {
+     *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
+     *             }
+     *         }
+     *     }
+     *     </pre>
+     */
+    // FIXME: this really should be a normal RPC implementation
+    public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(final ContainerNode input,
+            final EffectiveModelContext modelContext) {
+        final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
+        final var scopeName = extractStringLeaf(input, SCOPE_NODEID);
+        final var adapter = registerDataChangeListener(modelContext,
+            datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName) : LogicalDatastoreType.CONFIGURATION,
+            preparePath(input), scopeName != null ? Scope.ofName(scopeName) : Scope.BASE, prepareOutputType(input));
+
+        // building of output
+        return RestconfFuture.of(Optional.of(Builders.containerBuilder()
+            .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
+            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
+            .build()));
+    }
+
+    // FIXME: this really should be a normal RPC implementation
+    public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(final ContainerNode input,
+            final EffectiveModelContext modelContext) {
+        final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
+            .map(LeafSetEntryNode::body)
+            .map(QName::create)
+            .sorted()
+            .collect(ImmutableSet.toImmutableSet());
+
+        for (var qname : qnames) {
+            if (modelContext.findNotification(qname).isEmpty()) {
+                throw new RestconfDocumentedException(qname + " refers to an unknown notification",
+                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
+            }
+        }
+
+        // registration of the listener
+        final var adapter = registerNotificationListener(modelContext, qnames, prepareOutputType(input));
+
+        return RestconfFuture.of(Optional.of(Builders.containerBuilder()
+            .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
+            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
+            .build()));
+    }
+
+    /**
+     * Create device notification stream.
+     *
+     * @param baseUrl base Url
+     * @param input RPC input
+     * @param mountPointService dom mount point service
+     * @return {@link DOMRpcResult} - Output of RPC - example in JSON
+     */
+    // FIXME: this should be an RPC invocation
+    public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationListener(final ContainerNode input,
+            final String baseUrl, final DOMMountPointService mountPointService) {
+        // parsing out of container with settings and path
+        // FIXME: ugly cast
+        final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
+                .map(DataContainerChild::body)
+                .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
+                    ErrorTag.DATA_MISSING));
+
+        if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
+            throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
+                ErrorTag.INVALID_VALUE);
+        }
+        if (listId.size() != 1) {
+            throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
+                ErrorTag.INVALID_VALUE);
+        }
+        final String deviceName = listId.values().iterator().next().toString();
+
+        final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
+            .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
+                ErrorTag.OPERATION_FAILED));
+
+        final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
+            .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
+                ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
+
+        final var mountModelContext = mountPoint.getService(DOMSchemaService.class)
+            .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
+                ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
+            .getGlobalContext();
+        final var notificationPaths = mountModelContext.getModuleStatements().values().stream()
+            .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
+            .map(notification -> Absolute.of(notification.argument()))
+            .collect(ImmutableSet.toImmutableSet());
+        if (notificationPaths.isEmpty()) {
+            throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
+                ErrorTag.OPERATION_FAILED);
+        }
+
+        final var notificationListenerAdapter = registerDeviceNotificationListener(deviceName,
+            prepareOutputType(input), mountModelContext, mountPointService, mountPoint.getIdentifier());
+        notificationListenerAdapter.listen(mountNotifService, notificationPaths);
+
+        return RestconfFuture.of(Optional.of(Builders.containerBuilder()
+            .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
+            .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
+                baseUrl + notificationListenerAdapter.getStreamName()))
+            .build()));
+    }
+
+    /**
+     * Prepare {@link NotificationOutputType}.
+     *
+     * @param data Container with stream settings (RPC create-stream).
+     * @return Parsed {@link NotificationOutputType}.
+     */
+    private static NotificationOutputType prepareOutputType(final ContainerNode data) {
+        final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
+        return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
+    }
+
+    /**
+     * Prepare {@link YangInstanceIdentifier} of stream source.
+     *
+     * @param data Container with stream settings (RPC create-stream).
+     * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
+     *         are going to be generated.
+     */
+    private static YangInstanceIdentifier preparePath(final ContainerNode data) {
+        final var pathLeaf = data.childByArg(PATH_NODEID);
+        if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
+            return pathValue;
+        }
+
+        throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
+            ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
+    }
+
+    private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
+        return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
+            ? str : null;
+    }
 }
similarity index 91%
rename from restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtilTest.java
rename to restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBrokerTest.java
index cd054299a07a2b15817b8755f376aef455e7f4f2..d87f8542106cbe1c974d1707cfc831e66218c002 100644 (file)
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
+package org.opendaylight.restconf.nb.rfc8040.streams;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -19,7 +19,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -37,7 +36,7 @@ import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class CreateStreamUtilTest {
+public class ListenersBrokerTest {
     private static EffectiveModelContext SCHEMA_CTX;
 
     private final ListenersBroker listenersBroker = new ListenersBroker.ServerSentEvents();
@@ -52,7 +51,7 @@ public class CreateStreamUtilTest {
         assertEquals(prepareDomPayload("create-data-change-event-subscription",
             RpcDefinition::getOutput,
             "data-change-event-subscription/toaster:toaster/datastore=CONFIGURATION/scope=BASE", "stream-name"),
-            CreateStreamUtil.createDataChangeNotifiStream(listenersBroker,
+            listenersBroker.createDataChangeNotifiStream(
                 prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput, "toaster", "path"),
                 SCHEMA_CTX).getOrThrow().orElseThrow());
     }
@@ -62,7 +61,7 @@ public class CreateStreamUtilTest {
         final var payload = prepareDomPayload("create-data-change-event-subscription", RpcDefinition::getInput,
             "String value", "path");
         final var errors = assertThrows(RestconfDocumentedException.class,
-            () -> CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, payload, SCHEMA_CTX)).getErrors();
+            () -> listenersBroker.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors();
         assertEquals(1, errors.size());
         final var error = errors.get(0);
         assertEquals(ErrorType.APPLICATION, error.getErrorType());
@@ -75,7 +74,7 @@ public class CreateStreamUtilTest {
         final var payload = prepareDomPayload("create-data-change-event-subscription2", RpcDefinition::getInput,
             "toaster", "path2");
         final var errors = assertThrows(RestconfDocumentedException.class,
-            () -> CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, payload, SCHEMA_CTX)).getErrors();
+            () -> listenersBroker.createDataChangeNotifiStream(payload, SCHEMA_CTX)).getErrors();
         assertEquals(1, errors.size());
         final var error = errors.get(0);
         assertEquals(ErrorType.APPLICATION, error.getErrorType());