Populate restconf-state/streams 75/108875/2
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 7 Nov 2023 19:17:08 +0000 (20:17 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Wed, 8 Nov 2023 10:14:17 +0000 (11:14 +0100)
ListenersBroker needs to manage restconf-state/streams, so users
can locate the endpoint servicing each stream.

Pick up the required pieces from RestconfStateStreams and surrounding
commented-out code to generate proper entries.

Also clean up tests, so that we have nice and crispy test suite, without
duplications and asserting proper changes.

JIRA: NETCONF-1102
Change-Id: Ifddb9fba119e6580ac9cb1c6377b63285d8d6cb0
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/restconf-common/src/main/java/org/opendaylight/restconf/common/errors/RestconfFuture.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBroker.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStateStreams.java [deleted file]
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBrokerTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStateStreamsTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStreamsSubscriptionServiceImplTest.java [deleted file]
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java

index 11f60a4b5a259d142737af12d52db0fa0b8213b9..bd70f175727c2da518d9b1c16d318b63f9298a85 100644 (file)
@@ -15,6 +15,7 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 import org.eclipse.jdt.annotation.NonNull;
 
 /**
@@ -45,8 +46,41 @@ public sealed class RestconfFuture<V> extends AbstractFuture<@NonNull V> permits
         return false;
     }
 
-    public final void addCallback(final RestconfCallback<? super V> callback) {
+    /**
+     * Add a callback to invoke when this future completes, or immediately if it is already complete.
+     *
+     * @param callback Callback to invoke
+     * @return This future
+     * @throws NullPointerException if {@code callback} is {@code null}
+     */
+    public final @NonNull RestconfFuture<V> addCallback(final RestconfCallback<? super V> callback) {
         Futures.addCallback(this, callback, MoreExecutors.directExecutor());
+        return this;
+    }
+
+    /**
+     * Transform the result of this future using the specified function.
+     *
+     * @param <T> Resulting type
+     * @param function Function to apply
+     * @return Transformed future
+     * @throws NullPointerException if {@code function} is {@code null}
+     */
+    public final <T> @NonNull RestconfFuture<T> transform(final Function<@NonNull V, @NonNull T> function) {
+        final var fun = requireNonNull(function);
+        final var ret = new RestconfFuture<T>();
+        addCallback(new RestconfCallback<>() {
+            @Override
+            public void onSuccess(final V result) {
+                ret.set(requireNonNull(fun.apply(result)));
+            }
+
+            @Override
+            protected void onFailure(final RestconfDocumentedException failure) {
+                ret.setException(failure);
+            }
+        });
+        return ret;
     }
 
     /**
index ba346794eeba7cd17ea6524ac0711f55f36737e4..10bc8c5cf4180447e3e6e3c7434b75631e9c72af 100644 (file)
@@ -160,13 +160,14 @@ public final class RestconfInvokeOperationsServiceImpl {
         if (mountPoint == null) {
             // Hacked-up integration of streams
             if (CreateDataChangeEventSubscription.QNAME.equals(type)) {
-                return listenersBroker.createDataChangeNotifiStream(databindProvider, input,
+                return listenersBroker.createDataChangeNotifiStream(databindProvider, uriInfo, input,
                     localDatabind.modelContext());
             } else if (CreateNotificationStream.QNAME.equals(type)) {
-                return listenersBroker.createNotificationStream(databindProvider, input, localDatabind.modelContext());
+                return listenersBroker.createNotificationStream(databindProvider, uriInfo, input,
+                    localDatabind.modelContext());
             } else if (SubscribeDeviceNotification.QNAME.equals(type)) {
-                return listenersBroker.createDeviceNotificationStream(input,
-                    listenersBroker.prepareUriByStreamName(uriInfo, "").toString(), mountPointService);
+                return listenersBroker.createDeviceNotificationStream(uriInfo, input, localDatabind.modelContext(),
+                    mountPointService);
             }
         }
 
index 689fa35dcac49bd118dfb0afe339f9c6bb1ea76c..b4bc0cf46dde7bf681317f7b3cfaac2e98748502 100644 (file)
@@ -9,10 +9,10 @@ package org.opendaylight.restconf.nb.rfc8040.streams;
 
 import static java.util.Objects.requireNonNull;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.MoreExecutors;
-import java.net.URI;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -30,11 +30,14 @@ import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.common.errors.RestconfFuture;
+import org.opendaylight.restconf.common.errors.SettableRestconfFuture;
 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
+import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.RestconfState;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.Streams;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.Stream;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
@@ -54,6 +57,8 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
@@ -79,10 +84,11 @@ public abstract sealed class ListenersBroker {
         }
 
         @Override
-        public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+        public String baseStreamLocation(final UriInfo uriInfo) {
             return uriInfo.getBaseUriBuilder()
-                .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
-                .build();
+                .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH)
+                .build()
+                .toString();
         }
     }
 
@@ -95,7 +101,7 @@ public abstract sealed class ListenersBroker {
         }
 
         @Override
-        public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+        public String baseStreamLocation(final UriInfo uriInfo) {
             final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
                 // Secured HTTP goes to Secured WebSockets
                 case "https" -> "wss";
@@ -105,8 +111,9 @@ public abstract sealed class ListenersBroker {
 
             return uriInfo.getBaseUriBuilder()
                 .scheme(scheme)
-                .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
-                .build();
+                .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH)
+                .build()
+                .toString();
         }
     }
 
@@ -126,89 +133,31 @@ public abstract sealed class ListenersBroker {
         @NonNull T createStream(@NonNull String name);
     }
 
-    /**
-     * Holder of all handlers for notifications.
-     */
-    // FIXME: why do we even need this class?!
-    private record HandlersHolder(
-            @NonNull DOMDataBroker dataBroker,
-            @NonNull DOMNotificationService notificationService,
-            @NonNull DatabindProvider databindProvider) {
-
-        HandlersHolder {
-            requireNonNull(dataBroker);
-            requireNonNull(notificationService);
-            requireNonNull(databindProvider);
-        }
-    }
-
-//    private static final QName LOCATION_QNAME = QName.create(Notifi.QNAME, "location").intern();
-//    private static final NodeIdentifier LOCATION_NODEID = NodeIdentifier.create(LOCATION_QNAME);
-//
-//    private final ListenersBroker listenersBroker;
-//    private final HandlersHolder handlersHolder;
-//
-//  // FIXME: NETCONF:1102: do not instantiate this service
-//  new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
-//      listenersBroker),
-//
-//    /**
-//     * Initialize holder of handlers with holders as parameters.
-//     *
-//     * @param dataBroker {@link DOMDataBroker}
-//     * @param notificationService {@link DOMNotificationService}
-//     * @param databindProvider a {@link DatabindProvider}
-//     * @param listenersBroker a {@link ListenersBroker}
-//     */
-//    public RestconfStreamsSubscriptionServiceImpl(final DOMDataBroker dataBroker,
-//            final DOMNotificationService notificationService, final DatabindProvider databindProvider,
-//            final ListenersBroker listenersBroker) {
-//        handlersHolder = new HandlersHolder(dataBroker, notificationService, databindProvider);
-//        this.listenersBroker = requireNonNull(listenersBroker);
-//    }
-//
-//    @Override
-//    public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
-//        final var params = QueryParams.newReceiveEventsParams(uriInfo);
-//
-//        final URI location;
-//        if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
-//            location = listenersBroker.subscribeToDataStream(identifier, uriInfo, params, handlersHolder);
-//        } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
-//            location = listenersBroker.subscribeToYangStream(identifier, uriInfo, params, handlersHolder);
-//        } else {
-//            final String msg = "Bad type of notification of sal-remote";
-//            LOG.warn(msg);
-//            throw new RestconfDocumentedException(msg);
-//        }
-//
-//        return Response.ok()
-//            .location(location)
-//            .entity(new NormalizedNodePayload(
-//                Inference.ofDataTreePath(handlersHolder.databindProvider().currentContext().modelContext(),
-//                    Notifi.QNAME, LOCATION_QNAME),
-//                ImmutableNodes.leafNode(LOCATION_NODEID, location.toString())))
-//            .build();
-//    }
-
     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
     private static final YangInstanceIdentifier RESTCONF_STATE_STREAMS = YangInstanceIdentifier.of(
         NodeIdentifier.create(RestconfState.QNAME),
         NodeIdentifier.create(Streams.QNAME),
         NodeIdentifier.create(Stream.QNAME));
 
-    private static final QName DATASTORE_QNAME =
-        QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern();
-    private static final QName OUTPUT_TYPE_QNAME =
-        QName.create(NotificationOutputTypeGrouping.QNAME, "notification-output-type").intern();
-    private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
-        QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern();
-    private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
-        QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
-    private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
-    private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
+    @VisibleForTesting
+    static final QName NAME_QNAME =  QName.create(Stream.QNAME, "name").intern();
+    @VisibleForTesting
+    static final QName DESCRIPTION_QNAME = QName.create(Stream.QNAME, "description").intern();
+    @VisibleForTesting
+    static final QName ENCODING_QNAME =  QName.create(Stream.QNAME, "encoding").intern();
+    @VisibleForTesting
+    static final QName LOCATION_QNAME =  QName.create(Stream.QNAME, "location").intern();
+
+    private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(
+        QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern());
+    @Deprecated(forRemoval = true)
+    private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(
+        QName.create(NotificationOutputTypeGrouping.QNAME, "notification-output-type").intern());
     private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
-        NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
+        NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern());
+    private static final NodeIdentifier DEVICE_NOTIFICATION_STREAM_PATH_NODEID =
+        NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "stream-path").intern());
+
     private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
         NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
     private static final NodeIdentifier NOTIFICATIONS =
@@ -238,14 +187,16 @@ public abstract sealed class ListenersBroker {
 
     /**
      * Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name, create
-     * the corresponding instance and register it
+     * the corresponding instance and register it.
      *
      * @param <T> Stream type
+     * @param baseStreamLocation base streams location
      * @param factory Factory for creating the actual stream instance
      * @return A {@link RestconfStream} instance
      * @throws NullPointerException if {@code factory} is {@code null}
      */
-    public final <T extends RestconfStream<?>> @NonNull T createStream(final StreamFactory<T> factory) {
+    final <T extends RestconfStream<?>> @NonNull RestconfFuture<T> createStream(final String description,
+            final String baseStreamLocation, final StreamFactory<T> factory) {
         String name;
         T stream;
         do {
@@ -255,7 +206,31 @@ public abstract sealed class ListenersBroker {
             stream = factory.createStream(name);
         } while (streams.putIfAbsent(name, stream) != null);
 
-        return stream;
+        // final captures for use with FutureCallback
+        final var streamName = name;
+        final var finalStream = stream;
+
+        // Now issue a put operation
+        final var ret = new SettableRestconfFuture<T>();
+        final var tx = dataBroker.newWriteOnlyTransaction();
+
+        tx.put(LogicalDatastoreType.OPERATIONAL, restconfStateStreamPath(streamName),
+            streamEntry(streamName, description, baseStreamLocation + '/' + streamName, ""));
+        tx.commit().addCallback(new FutureCallback<CommitInfo>() {
+            @Override
+            public void onSuccess(final CommitInfo result) {
+                LOG.debug("Stream {} added", streamName);
+                ret.set(finalStream);
+            }
+
+            @Override
+            public void onFailure(final Throwable cause) {
+                LOG.debug("Failed to add stream {}", streamName, cause);
+                streams.remove(streamName, finalStream);
+                ret.setFailure(new RestconfDocumentedException("Failed to allocate stream " + streamName, cause));
+            }
+        }, MoreExecutors.directExecutor());
+        return ret;
     }
 
     /**
@@ -290,57 +265,16 @@ public abstract sealed class ListenersBroker {
     }
 
     private static @NonNull YangInstanceIdentifier restconfStateStreamPath(final String streamName) {
-        return RESTCONF_STATE_STREAMS
-            .node(NodeIdentifierWithPredicates.of(Stream.QNAME, RestconfStateStreams.NAME_QNAME, streamName));
+        return RESTCONF_STATE_STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, streamName));
     }
 
     /**
-     * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
-     * and optionally {@link URLConstants#BASE_PATH} prefix.
-     *
-     * @param uri URI for creation of stream name.
-     * @return String representation of stream name.
-     */
-//    private static String createStreamNameFromUri(final String uri) {
-//        String result = requireNonNull(uri);
-//        while (true) {
-//            if (result.startsWith(URLConstants.BASE_PATH)) {
-//                result = result.substring(URLConstants.BASE_PATH.length());
-//            } else if (result.startsWith("/")) {
-//                result = result.substring(1);
-//            } else {
-//                break;
-//            }
-//        }
-//        if (result.endsWith("/")) {
-//            result = result.substring(0, result.length() - 1);
-//        }
-//        return result;
-//    }
-
-    /**
-     * Prepare URL from base name and stream name.
+     * Return the base location URL of the streams service based on request URI.
      *
-     * @param uriInfo base URL information
-     * @param streamName name of stream for create
-     * @return final URL
+     * @param uriInfo request URL information
+     * @return location URL
      */
-    public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
-
-    // FIXME: callers are utter duplicates, refactor them
-//    private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
-//        // FIXME: use put() here
-//        tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
-//            mapToStreams);
-//    }
-//
-//    private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
-//        try {
-//            readWriteTransaction.commit().get();
-//        } catch (final InterruptedException | ExecutionException e) {
-//            throw new RestconfDocumentedException("Problem while putting data to DS.", e);
-//        }
-//    }
+    public abstract @NonNull String baseStreamLocation(UriInfo uriInfo);
 
     /**
      * Create data-change-event stream with POST operation via RPC.
@@ -352,7 +286,6 @@ public abstract sealed class ListenersBroker {
      *                      "input": {
      *                          "path": "/toaster:toaster/toaster:toasterStatus",
      *                          "sal-remote-augment:datastore": "OPERATIONAL",
-     *                          "sal-remote-augment:scope": "ONE"
      *                      }
      *                  }
      *              }
@@ -371,23 +304,25 @@ public abstract sealed class ListenersBroker {
      */
     // FIXME: this really should be a normal RPC implementation
     public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(
-            final DatabindProvider databindProvider, final ContainerNode input,
+            final DatabindProvider databindProvider, final UriInfo uriInfo, final ContainerNode input,
             final EffectiveModelContext modelContext) {
         final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
         final var datastore = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
             : LogicalDatastoreType.CONFIGURATION;
         final var path = preparePath(input);
+
         final var outputType = prepareOutputType(input);
-        final var adapter = createStream(name -> new DataTreeChangeStream(this, name, outputType, databindProvider,
-            datastore, path));
-
-        // building of output
-        return RestconfFuture.of(Optional.of(Builders.containerBuilder()
-            .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
-            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.name()))
-            .build()));
+        return createStream(
+            "Events occuring in " + datastore + " datastore under /" + IdentifierCodec.serialize(path, modelContext),
+            baseStreamLocation(uriInfo),
+            name -> new DataTreeChangeStream(this, name, outputType, databindProvider, datastore, path))
+            .transform(stream -> Optional.of(Builders.containerBuilder()
+                .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
+                .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name()))
+                .build()));
     }
 
+// FIXME: NETCONF-1102: this part needs to be invoked from subscriber
 //    /**
 //     * Register listener by streamName in identifier to listen to data change notifications, and put or delete
 //     * information about listener to DS according to ietf-restconf-monitoring.
@@ -408,27 +343,12 @@ public abstract sealed class ListenersBroker {
 //        }
 //
 //        listener.setQueryParams(notificationQueryParams);
-//
-//        final var dataBroker = handlersHolder.dataBroker();
-//        final var schemaHandler = handlersHolder.databindProvider();
-//        listener.setCloseVars(schemaHandler);
 //        listener.listen(dataBroker);
-//
-//        final var uri = prepareUriByStreamName(uriInfo, streamName);
-//        final var schemaContext = schemaHandler.currentContext().modelContext();
-//        final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
-//
-//        final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
-//                listener.getOutputType(), uri, schemaContext, serializedPath);
-//        final var writeTransaction = dataBroker.newWriteOnlyTransaction();
-//        writeDataToDS(writeTransaction, mapToStreams);
-//        submitData(writeTransaction);
-//        return uri;
 //    }
 
     // FIXME: this really should be a normal RPC implementation
     public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(
-            final DatabindProvider databindProvider, final ContainerNode input,
+            final DatabindProvider databindProvider, final UriInfo uriInfo, final ContainerNode input,
             final EffectiveModelContext modelContext) {
         final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
             .map(LeafSetEntryNode::body)
@@ -436,92 +356,51 @@ public abstract sealed class ListenersBroker {
             .sorted()
             .collect(ImmutableSet.toImmutableSet());
 
+        final var description = new StringBuilder("YANG notifications matching any of {");
+        var haveFirst = false;
         for (var qname : qnames) {
-            if (modelContext.findNotification(qname).isEmpty()) {
-                throw new RestconfDocumentedException(qname + " refers to an unknown notification",
+            final var module = modelContext.findModuleStatement(qname.getModule())
+                .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
+                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+            final var stmt = module.findSchemaTreeNode(qname)
+                .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown notification",
+                    ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
+            if (!(stmt instanceof NotificationEffectiveStatement)) {
+                throw new RestconfDocumentedException(qname + " refers to a non-notification",
                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
             }
-        }
 
-// FIXME: use this block to create a stream description
-//        final var module = refSchemaCtx.findModuleStatement(qname.getModule())
-//            .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
-//                ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
-//        final var stmt = module.findSchemaTreeNode(qname)
-//            .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
-//                ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
-//        if (!(stmt instanceof NotificationEffectiveStatement)) {
-//            throw new RestconfDocumentedException(qname + " refers to a non-notification",
-//                ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
-//        }
-//
-//        if (haveFirst) {
-//            sb.append(',');
-//        } else {
-//            haveFirst = true;
-//        }
-//        sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
+            if (haveFirst) {
+                description.append(",\n");
+            } else {
+                haveFirst = true;
+            }
+            description.append("\n  ")
+                .append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
+        }
+        description.append("\n}");
 
         // registration of the listener
         final var outputType = prepareOutputType(input);
-        final var adapter = createStream(name -> new NotificationStream(this, name, outputType,
-            databindProvider, qnames));
-
-        return RestconfFuture.of(Optional.of(Builders.containerBuilder()
-            .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
-            .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.name()))
-            .build()));
+        return createStream(description.toString(), baseStreamLocation(uriInfo),
+            name -> new NotificationStream(this, name, outputType, databindProvider, qnames))
+            .transform(stream -> Optional.of(Builders.containerBuilder()
+                .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
+                .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name()))
+                .build()));
     }
 
-    /**
-     * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
-     * about listener to DS according to ietf-restconf-monitoring.
-     *
-     * @param identifier              Name of the stream.
-     * @param uriInfo                 URI information.
-     * @param notificationQueryParams Query parameters of notification.
-     * @param handlersHolder          Holder of handlers for notifications.
-     * @return Stream location for listening.
-     */
-//    public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
-//            final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
-//        final String streamName = createStreamNameFromUri(identifier);
-//        if (isNullOrEmpty(streamName)) {
-//            throw new RestconfDocumentedException("Stream name is empty", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
-//        }
-//
-//        final var notificationListenerAdapter = notificationListenerFor(streamName);
-//        if (notificationListenerAdapter == null) {
-//            throw new RestconfDocumentedException("Stream with name %s was not found".formatted(streamName),
-//                ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
-//        }
-//
-//        final URI uri = prepareUriByStreamName(uriInfo, streamName);
-//        notificationListenerAdapter.setQueryParams(notificationQueryParams);
-//        notificationListenerAdapter.listen(handlersHolder.notificationService());
-//        final DOMDataBroker dataBroker = handlersHolder.dataBroker();
-//        notificationListenerAdapter.setCloseVars(handlersHolder.databindProvider());
-//        final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
-//            notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
-//
-//        // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
-//        final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
-//        writeDataToDS(writeTransaction, mapToStreams);
-//        submitData(writeTransaction);
-//        return uri;
-//    }
-
     /**
      * Create device notification stream.
      *
-     * @param baseUrl base Url
      * @param input RPC input
      * @param mountPointService dom mount point service
      * @return {@link DOMRpcResult} - Output of RPC - example in JSON
      */
     // FIXME: this should be an RPC invocation
-    public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final ContainerNode input,
-            final String baseUrl, final DOMMountPointService mountPointService) {
+    public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final UriInfo uriInfo,
+            final ContainerNode input, final EffectiveModelContext modelContext,
+            final DOMMountPointService mountPointService) {
         // parsing out of container with settings and path
         // FIXME: ugly cast
         final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
@@ -559,20 +438,21 @@ public abstract sealed class ListenersBroker {
                 ErrorTag.OPERATION_FAILED);
         }
 
-// FIXME: use this for description?
-//        final String deviceName = listId.values().iterator().next().toString();
-
+        final var baseStreamsUri = baseStreamLocation(uriInfo);
         final var outputType = prepareOutputType(input);
-        final var notificationListenerAdapter = createStream(
+        return createStream(
+            "All YANG notifications occuring on mount point /" + IdentifierCodec.serialize(path, modelContext),
+            baseStreamsUri,
             streamName -> new DeviceNotificationStream(this, streamName, outputType, mountModelContext,
-                mountPointService, mountPoint.getIdentifier()));
-        notificationListenerAdapter.listen(mountNotifService, notificationPaths);
-
-        return RestconfFuture.of(Optional.of(Builders.containerBuilder()
-            .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
-            .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
-                baseUrl + notificationListenerAdapter.name()))
-            .build()));
+                mountPointService, mountPoint.getIdentifier()))
+            .transform(stream -> {
+                stream.listen(mountNotifService, notificationPaths);
+                return Optional.of(Builders.containerBuilder()
+                    .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
+                    .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH_NODEID,
+                        baseStreamsUri + '/' + stream.name()))
+                    .build());
+            });
     }
 
     /**
@@ -581,6 +461,7 @@ public abstract sealed class ListenersBroker {
      * @param data Container with stream settings (RPC create-stream).
      * @return Parsed {@link NotificationOutputType}.
      */
+    @Deprecated(forRemoval = true)
     private static NotificationOutputType prepareOutputType(final ContainerNode data) {
         final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
         return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
@@ -607,4 +488,26 @@ public abstract sealed class ListenersBroker {
         return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
             ? str : null;
     }
+
+    @VisibleForTesting
+    static @NonNull MapEntryNode streamEntry(final String name, final String description, final String location,
+            final String outputType) {
+        return Builders.mapEntryBuilder()
+            .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name))
+            .withChild(ImmutableNodes.leafNode(NAME_QNAME, name))
+            .withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, description))
+            .withChild(createAccessList(outputType, location))
+            .build();
+    }
+
+    private static MapNode createAccessList(final String outputType, final String location) {
+        return Builders.mapBuilder()
+            .withNodeIdentifier(new NodeIdentifier(Access.QNAME))
+            .withChild(Builders.mapEntryBuilder()
+                .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, outputType))
+                .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, outputType))
+                .withChild(ImmutableNodes.leafNode(LOCATION_QNAME, location))
+                .build())
+            .build();
+    }
 }
diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStateStreams.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStateStreams.java
deleted file mode 100644 (file)
index 9f4fe2a..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams;
-
-import static org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.$YangModuleInfoImpl.qnameOf;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.net.URI;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.Stream;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.util.DataSchemaContext;
-import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
-import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-
-/**
- * Utilities for creating the content of {@code /ietf-restconf-monitoring:restconf-state/streams}.
- */
-public final class RestconfStateStreams {
-    @VisibleForTesting
-    static final QName DESCRIPTION_QNAME = qnameOf("description");
-    @VisibleForTesting
-    static final QName ENCODING_QNAME = qnameOf("encoding");
-    @VisibleForTesting
-    static final QName LOCATION_QNAME = qnameOf("location");
-    static final QName NAME_QNAME = qnameOf("name");
-
-    private RestconfStateStreams() {
-        // Hidden on purpose
-    }
-
-    /**
-     * Map data of YANG notification stream to a {@link MapEntryNode} according to {@code ietf-restconf-monitoring}.
-     *
-     * @param streamName stream name
-     * @param qnames Notification QNames to listen on
-     * @param outputType output type of notification
-     * @param uri location of registered listener for sending data of notification
-     * @return mapped data of notification - map entry node if parent exists,
-     *         container streams with list and map entry node if not
-     */
-    public static MapEntryNode notificationStreamEntry(final String streamName, final Set<QName> qnames,
-            final String outputType, final URI uri) {
-        return Builders.mapEntryBuilder()
-            .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, streamName))
-            .withChild(ImmutableNodes.leafNode(NAME_QNAME, streamName))
-            .withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, qnames.stream()
-                .map(QName::toString)
-                .collect(Collectors.joining(","))))
-            .withChild(createAccessList(outputType, uri))
-            .build();
-    }
-
-    /**
-     * Map data of data change notification to normalized node according to ietf-restconf-monitoring.
-     *
-     * @param path path of data to listen on
-     * @param outputType output type of notification
-     * @param uri location of registered listener for sending data of notification
-     * @param schemaContext schemaContext for parsing instance identifier to get schema node of data
-     * @return mapped data of notification - map entry node if parent exists,
-     *         container streams with list and map entry node if not
-     */
-    public static MapEntryNode dataChangeStreamEntry(final YangInstanceIdentifier path,
-            final String outputType, final URI uri, final EffectiveModelContext schemaContext,
-            final String streamName) {
-        final var streamEntry = Builders.mapEntryBuilder()
-                .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, streamName))
-                .withChild(ImmutableNodes.leafNode(NAME_QNAME, streamName));
-
-        DataSchemaContextTree.from(schemaContext).findChild(path)
-            .map(DataSchemaContext::dataSchemaNode)
-            .flatMap(DataSchemaNode::getDescription)
-            .ifPresent(desc -> streamEntry.withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, desc)));
-
-        return streamEntry
-            .withChild(createAccessList(outputType, uri))
-            .build();
-    }
-
-    private static MapNode createAccessList(final String outputType, final URI uriToWebsocketServer) {
-        return Builders.mapBuilder()
-            .withNodeIdentifier(new NodeIdentifier(Access.QNAME))
-            .withChild(Builders.mapEntryBuilder()
-                .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, outputType))
-                .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, outputType))
-                .withChild(ImmutableNodes.leafNode(LOCATION_QNAME, uriToWebsocketServer.toString()))
-                .build())
-            .build();
-    }
-}
index 0f818065ec6900bd8681d23854e1a34067211242..b9f3943153edc0598318d22d43fb9044c006ecda 100644 (file)
@@ -13,25 +13,40 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 
+import java.net.URI;
 import java.util.UUID;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.ContainerLike;
@@ -46,6 +61,18 @@ class ListenersBrokerTest {
 
     @Mock
     private DOMDataBroker dataBroker;
+    @Mock
+    private DOMDataTreeWriteTransaction tx;
+    @Mock
+    private UriInfo uriInfo;
+    @Mock
+    private UriBuilder uriBuilder;
+    @Captor
+    private ArgumentCaptor<String> uriCaptor;
+    @Captor
+    private ArgumentCaptor<YangInstanceIdentifier> pathCaptor;
+    @Captor
+    private ArgumentCaptor<NormalizedNode> dataCaptor;
 
     private ListenersBroker listenersBroker;
     private DatabindProvider databindProvider;
@@ -58,8 +85,16 @@ class ListenersBrokerTest {
 
     @Test
     void createStreamTest() {
+        doReturn(tx).when(dataBroker).newWriteOnlyTransaction();
+        doNothing().when(tx).put(eq(LogicalDatastoreType.OPERATIONAL), pathCaptor.capture(), dataCaptor.capture());
+        doReturn(CommitInfo.emptyFluentFuture()).when(tx).commit();
+
+        doReturn(uriBuilder).when(uriInfo).getBaseUriBuilder();
+        doReturn(uriBuilder).when(uriBuilder).replacePath(uriCaptor.capture());
+        doAnswer(inv -> new URI(uriCaptor.getValue())).when(uriBuilder).build();
+
         final var output = assertInstanceOf(ContainerNode.class, listenersBroker.createDataChangeNotifiStream(
-            databindProvider, prepareDomPayload("create-data-change-event-subscription", "toaster", "path"),
+            databindProvider, uriInfo, prepareDomPayload("create-data-change-event-subscription", "toaster", "path"),
             SCHEMA_CTX).getOrThrow().orElse(null));
 
         assertEquals(new NodeIdentifier(CreateDataChangeEventSubscriptionOutput.QNAME), output.name());
@@ -72,13 +107,41 @@ class ListenersBrokerTest {
         assertEquals(45, name.length());
         assertThat(name, startsWith("urn:uuid:"));
         assertNotNull(UUID.fromString(name.substring(9)));
+
+        final var rcStream = QName.create("urn:ietf:params:xml:ns:yang:ietf-restconf-monitoring", "2017-01-26",
+            "stream");
+        final var rcName = QName.create(rcStream, "name");
+        final var streamId = NodeIdentifierWithPredicates.of(rcStream, rcName, name);
+        final var rcEncoding = QName.create(rcStream, "encoding");
+
+        assertEquals(YangInstanceIdentifier.of(
+            new NodeIdentifier(QName.create(rcStream, "restconf-state")),
+            new NodeIdentifier(QName.create(rcStream, "streams")),
+            new NodeIdentifier(rcStream),
+            streamId), pathCaptor.getValue());
+        assertEquals(Builders.mapEntryBuilder()
+            .withNodeIdentifier(streamId)
+            .withChild(ImmutableNodes.leafNode(rcName, name))
+            .withChild(ImmutableNodes.leafNode(QName.create(rcStream, "description"),
+                "Events occuring in CONFIGURATION datastore under /toaster:toaster"))
+            .withChild(Builders.mapBuilder()
+                .withNodeIdentifier(new NodeIdentifier(Access.QNAME))
+                .withChild(Builders.mapEntryBuilder()
+                    .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, rcEncoding, ""))
+                    .withChild(ImmutableNodes.leafNode(rcEncoding, ""))
+                    .withChild(ImmutableNodes.leafNode(QName.create(rcStream, "location"),
+                        "rests/streams/" + name))
+                    .build())
+                .build())
+            .build(), dataCaptor.getValue());
     }
 
     @Test
     void createStreamWrongValueTest() {
         final var payload = prepareDomPayload("create-data-change-event-subscription", "String value", "path");
         final var errors = assertThrows(RestconfDocumentedException.class,
-            () -> listenersBroker.createDataChangeNotifiStream(databindProvider, payload, SCHEMA_CTX)).getErrors();
+            () -> listenersBroker.createDataChangeNotifiStream(databindProvider, uriInfo, payload, SCHEMA_CTX))
+            .getErrors();
         assertEquals(1, errors.size());
         final var error = errors.get(0);
         assertEquals(ErrorType.APPLICATION, error.getErrorType());
@@ -90,7 +153,8 @@ class ListenersBrokerTest {
     void createStreamWrongInputRpcTest() {
         final var payload = prepareDomPayload("create-data-change-event-subscription2", "toaster", "path2");
         final var errors = assertThrows(RestconfDocumentedException.class,
-            () -> listenersBroker.createDataChangeNotifiStream(databindProvider, payload, SCHEMA_CTX)).getErrors();
+            () -> listenersBroker.createDataChangeNotifiStream(databindProvider, uriInfo, payload, SCHEMA_CTX))
+            .getErrors();
         assertEquals(1, errors.size());
         final var error = errors.get(0);
         assertEquals(ErrorType.APPLICATION, error.getErrorType());
index 7d29be1db89d658b09a2d79ebfc165933925559b..1f01a33350feb166eaed6547016664997ed0663e 100644 (file)
@@ -7,22 +7,18 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.module.list.Module;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.module.list.module.Deviation;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.Revision;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
@@ -35,49 +31,42 @@ import org.slf4j.LoggerFactory;
 /**
  * Unit tests for {@link RestconfStateStreams}.
  */
-public class RestconfStateStreamsTest {
+class RestconfStateStreamsTest {
     private static final Logger LOG = LoggerFactory.getLogger(RestconfStateStreamsTest.class);
 
-    private static EffectiveModelContext schemaContext;
-    private static EffectiveModelContext schemaContextMonitoring;
-
-    @BeforeClass
-    public static void loadTestSchemaContextAndModules() throws Exception {
-        // FIXME: assemble these from dependencies
-        schemaContext = YangParserTestUtils.parseYangResourceDirectory("/modules/restconf-module-testing");
-        schemaContextMonitoring = YangParserTestUtils.parseYangResourceDirectory("/modules");
-    }
+    // FIXME: assemble these from dependencies
+    private static EffectiveModelContext schemaContext =
+        YangParserTestUtils.parseYangResourceDirectory("/modules/restconf-module-testing");
+    private static EffectiveModelContext schemaContextMonitoring =
+        YangParserTestUtils.parseYangResourceDirectory("/modules");
 
     @Test
-    public void toStreamEntryNodeTest() throws Exception {
-        final YangInstanceIdentifier path = ParserIdentifier.toInstanceIdentifier(
+    void toStreamEntryNodeTest() throws Exception {
+        final var path = ParserIdentifier.toInstanceIdentifier(
                 "nested-module:depth1-cont/depth2-leaf1", schemaContextMonitoring, null).getInstanceIdentifier();
-        final String outputType = "XML";
-        final URI uri = new URI("uri");
-        final String streamName = "/nested-module:depth1-cont/depth2-leaf1";
+        final var outputType = "XML";
+        final var uri = "uri";
+        final var streamName = "/nested-module:depth1-cont/depth2-leaf1";
 
         assertMappedData(prepareMap(streamName, uri, outputType),
-            RestconfStateStreams.dataChangeStreamEntry(path, outputType, uri, schemaContextMonitoring, streamName));
+            ListenersBroker.streamEntry(streamName, "description", "location", outputType));
     }
 
     @Test
-    public void toStreamEntryNodeNotifiTest() throws Exception {
-        final String outputType = "JSON";
-        final URI uri = new URI("uri");
-
-        final var map = prepareMap("notifi", uri, outputType);
-        map.put(RestconfStateStreams.DESCRIPTION_QNAME, "(urn:nested:module?revision=2014-06-03)notifi");
+    void toStreamEntryNodeNotifiTest() throws Exception {
+        final var outputType = "JSON";
+        final var uri = "uri";
 
-        assertMappedData(map, RestconfStateStreams.notificationStreamEntry("notifi",
-            Set.of(QName.create("urn:nested:module", "2014-06-03", "notifi")), outputType, uri));
+        assertMappedData(prepareMap("notifi", uri, outputType),
+            ListenersBroker.streamEntry("notifi", "description", "location", outputType));
     }
 
-    private static Map<QName, Object> prepareMap(final String name, final URI uri, final String outputType) {
-        final var map = new HashMap<QName, Object>();
-        map.put(RestconfStateStreams.NAME_QNAME, name);
-        map.put(RestconfStateStreams.LOCATION_QNAME, uri.toString());
-        map.put(RestconfStateStreams.ENCODING_QNAME, outputType);
-        return map;
+    private static Map<QName, Object> prepareMap(final String name, final String uri, final String outputType) {
+        return Map.of(
+            ListenersBroker.NAME_QNAME, name,
+            ListenersBroker.LOCATION_QNAME, uri,
+            ListenersBroker.ENCODING_QNAME, outputType,
+            ListenersBroker.DESCRIPTION_QNAME, "description");
     }
 
     private static void assertMappedData(final Map<QName, Object> map, final MapEntryNode mappedData) {
@@ -96,6 +85,7 @@ public class RestconfStateStreamsTest {
      * @param containerNode
      *             modules
      */
+    // FIXME: what is this supposed to verify?
     private static void verifyDeviations(final ContainerNode containerNode) {
         int deviationsFound = 0;
         for (var child : containerNode.body()) {
@@ -118,8 +108,9 @@ public class RestconfStateStreamsTest {
      * @param containerNode
      *             modules
      */
+    // FIXME: what is this supposed to verify?
     private static void verifyLoadedModules(final ContainerNode containerNode) {
-        final Map<String, String> loadedModules = new HashMap<>();
+        final var loadedModules = new HashMap<String, String>();
 
         for (var child : containerNode.body()) {
             if (child instanceof LeafNode) {
@@ -150,12 +141,12 @@ public class RestconfStateStreamsTest {
         }
 
         final var expectedModules = schemaContext.getModules();
-        assertEquals("Number of loaded modules is not as expected", expectedModules.size(), loadedModules.size());
+        assertEquals(expectedModules.size(), loadedModules.size());
         for (var m : expectedModules) {
             final String name = m.getName();
             final String revision = loadedModules.get(name);
             assertNotNull("Expected module not found", revision);
-            assertEquals("Incorrect revision of loaded module", Revision.ofNullable(revision), m.getRevision());
+            assertEquals(Revision.ofNullable(revision), m.getRevision());
 
             loadedModules.remove(name);
         }
diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStreamsSubscriptionServiceImplTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStreamsSubscriptionServiceImplTest.java
deleted file mode 100644 (file)
index 393f700..0000000
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams;
-
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertThrows;
-//import static org.mockito.ArgumentMatchers.any;
-//import static org.mockito.Mockito.doReturn;
-//import static org.mockito.Mockito.mock;
-//
-//import com.google.common.collect.ImmutableClassToInstanceMap;
-//import java.net.URI;
-//import javax.ws.rs.core.MultivaluedHashMap;
-//import javax.ws.rs.core.UriBuilder;
-//import javax.ws.rs.core.UriInfo;
-//import org.junit.Before;
-//import org.junit.Test;
-//import org.junit.runner.RunWith;
-//import org.mockito.Mock;
-//import org.mockito.junit.MockitoJUnitRunner;
-//import org.opendaylight.mdsal.common.api.CommitInfo;
-//import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-//import org.opendaylight.mdsal.dom.api.DOMDataBroker;
-//import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
-//import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
-//import org.opendaylight.mdsal.dom.api.DOMNotificationService;
-//import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-//import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
-//import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
-//import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker;
-//import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
-//import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708
-// .CreateDataChangeEventSubscriptionInput1.Scope;
-//import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping
-// .NotificationOutputType;
-//import org.opendaylight.yangtools.concepts.ListenerRegistration;
-//import org.opendaylight.yangtools.yang.common.ErrorTag;
-//import org.opendaylight.yangtools.yang.common.ErrorType;
-
-//@RunWith(MockitoJUnitRunner.StrictStubs.class)
-public class RestconfStreamsSubscriptionServiceImplTest extends AbstractNotificationListenerTest {
-//    private static final String URI = "/rests/data/ietf-restconf-monitoring:restconf-state/streams/stream/"
-//            + "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
-//
-//    @Mock
-//    private DOMDataBroker dataBroker;
-//    @Mock
-//    private UriInfo uriInfo;
-//    @Mock
-//    private DOMNotificationService notificationService;
-//
-//    private final DatabindProvider databindProvider = () -> DatabindContext.ofModel(MODEL_CONTEXT);
-//
-//    @Before
-//    public void setUp() throws Exception {
-//        final var wTx = mock(DOMDataTreeWriteTransaction.class);
-//        doReturn(wTx).when(dataBroker).newWriteOnlyTransaction();
-//        doReturn(CommitInfo.emptyFluentFuture()).when(wTx).commit();
-//
-//        final var dataTreeChangeService = mock(DOMDataTreeChangeService.class);
-//        doReturn(mock(ListenerRegistration.class)).when(dataTreeChangeService)
-//                .registerDataTreeChangeListener(any(), any());
-//
-//        doReturn(ImmutableClassToInstanceMap.of(DOMDataTreeChangeService.class, dataTreeChangeService))
-//                .when(dataBroker).getExtensions();
-//
-//        doReturn(new MultivaluedHashMap<>()).when(uriInfo).getQueryParameters();
-//        doReturn(UriBuilder.fromUri("http://localhost:8181")).when(uriInfo).getBaseUriBuilder();
-//        doReturn(new URI("http://127.0.0.1/" + URI)).when(uriInfo).getAbsolutePath();
-//    }
-//
-//    @Test
-//    public void testSubscribeToStreamSSE() {
-//        final var listenersBroker = new ListenersBroker.ServerSentEvents();
-//        listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
-//            IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
-//            NotificationOutputType.XML);
-//        final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-//            notificationService, databindProvider,listenersBroker);
-//        final var response = streamsSubscriptionService.subscribeToStream(
-//            "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
-//        assertEquals("http://localhost:8181/rests/streams"
-//            + "/data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
-//            response.getLocation().toString());
-//    }
-//
-//    @Test
-//    public void testSubscribeToStreamWS() {
-//        final var listenersBroker = new ListenersBroker.WebSockets();
-//        listenersBroker.registerDataChangeListener(MODEL_CONTEXT, LogicalDatastoreType.OPERATIONAL,
-//            IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
-//            NotificationOutputType.XML);
-//        final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-//            notificationService, databindProvider, listenersBroker);
-//        final var response = streamsSubscriptionService.subscribeToStream(
-//            "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
-//        assertEquals("ws://localhost:8181/rests/streams"
-//            + "/data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
-//            response.getLocation().toString());
-//    }
-//
-//    @Test
-//    public void testSubscribeToStreamMissingDatastoreInPath() {
-//        final var listenersBroker = new ListenersBroker.WebSockets();
-//        final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-//            notificationService, databindProvider, listenersBroker);
-//        final var errors = assertThrows(RestconfDocumentedException.class,
-//            () -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/scope=ONE", uriInfo))
-//            .getErrors();
-//        assertEquals(1, errors.size());
-//        final var error = errors.get(0);
-//        assertEquals(ErrorType.APPLICATION, error.getErrorType());
-//        assertEquals(ErrorTag.OPERATION_FAILED, error.getErrorTag());
-//        assertEquals("Bad type of notification of sal-remote", error.getErrorMessage());
-//    }
-//
-//    @Test
-//    public void testSubscribeToStreamMissingScopeInPath() {
-//        final var listenersBroker = new ListenersBroker.WebSockets();
-//        final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
-//            notificationService, databindProvider, listenersBroker);
-//        final var errors = assertThrows(RestconfDocumentedException.class,
-//            () -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL",
-//                uriInfo)).getErrors();
-//        assertEquals(1, errors.size());
-//        final var error = errors.get(0);
-//        assertEquals(ErrorType.APPLICATION, error.getErrorType());
-//        assertEquals(ErrorTag.OPERATION_FAILED, error.getErrorTag());
-//        assertEquals("Bad type of notification of sal-remote", error.getErrorMessage());
-//    }
-}
index c1658244a1d89a7261302c649bf71ab3e9440d0b..41e15fc992cdb4e5e8c17ed8472416cf04a3e407 100644 (file)
@@ -21,8 +21,10 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -39,6 +41,8 @@ class WebSocketFactoryTest extends AbstractNotificationListenerTest {
     @Mock
     private DOMDataBroker dataBroker;
     @Mock
+    private DOMDataTreeWriteTransaction tx;
+    @Mock
     private DatabindProvider databindProvider;
 
     private ListenersBroker listenersBroker;
@@ -47,12 +51,17 @@ class WebSocketFactoryTest extends AbstractNotificationListenerTest {
 
     @BeforeEach
     void prepareListenersBroker() {
+        doReturn(tx).when(dataBroker).newWriteOnlyTransaction();
+        doReturn(CommitInfo.emptyFluentFuture()).when(tx).commit();
+
         listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker);
         webSocketFactory = new WebSocketFactory(execService, listenersBroker, 5000, 2000);
 
-        streamName = listenersBroker.createStream(name -> new DataTreeChangeStream(listenersBroker, name,
-            NotificationOutputType.JSON, databindProvider, LogicalDatastoreType.CONFIGURATION,
-            YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))))
+        streamName = listenersBroker.createStream("description", "streams",
+            name -> new DataTreeChangeStream(listenersBroker, name, NotificationOutputType.JSON, databindProvider,
+                LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.of(
+                    QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))))
+            .getOrThrow()
             .name();
     }