2 * Copyright © 2019 FRINX s.r.o. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.collect.ImmutableSet;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.Optional;
17 import java.util.UUID;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import javax.ws.rs.core.UriInfo;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.opendaylight.mdsal.common.api.CommitInfo;
24 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
25 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
26 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
27 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
28 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
29 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
30 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
31 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
32 import org.opendaylight.restconf.common.errors.RestconfFuture;
33 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
34 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
40 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1;
41 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
42 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
43 import org.opendaylight.yangtools.yang.common.ErrorTag;
44 import org.opendaylight.yangtools.yang.common.ErrorType;
45 import org.opendaylight.yangtools.yang.common.QName;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
49 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
51 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
53 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
54 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
55 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
56 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
57 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
58 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
63 * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
64 * {@link NotificationListenerAdapter} listeners.
66 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
67 // names. We essentially need a component which deals with allocation of stream names and their lifecycle and
68 // the contents of /restconf-state/streams.
69 public abstract sealed class ListenersBroker {
71 * A ListenersBroker working with Server-Sent Events.
73 public static final class ServerSentEvents extends ListenersBroker {
74 public ServerSentEvents(final DOMDataBroker dataBroker) {
79 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
80 return uriInfo.getBaseUriBuilder()
81 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
87 * A ListenersBroker working with WebSockets.
89 public static final class WebSockets extends ListenersBroker {
90 public WebSockets(final DOMDataBroker dataBroker) {
95 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
96 final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
97 // Secured HTTP goes to Secured WebSockets
98 case "https" -> "wss";
99 // Unsecured HTTP and others go to unsecured WebSockets
103 return uriInfo.getBaseUriBuilder()
105 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
111 * Factory interface for creating instances of {@link AbstractStream}.
113 * @param <T> {@link AbstractStream} type
116 public interface StreamFactory<T extends AbstractStream<?>> {
118 * Create a stream with the supplied name.
120 * @param name Stream name
121 * @return An {@link AbstractStream}
123 @NonNull T createStream(@NonNull String name);
127 * Holder of all handlers for notifications.
129 // FIXME: why do we even need this class?!
130 private record HandlersHolder(
131 @NonNull DOMDataBroker dataBroker,
132 @NonNull DOMNotificationService notificationService,
133 @NonNull DatabindProvider databindProvider) {
136 requireNonNull(dataBroker);
137 requireNonNull(notificationService);
138 requireNonNull(databindProvider);
142 // private static final QName LOCATION_QNAME = QName.create(Notifi.QNAME, "location").intern();
143 // private static final NodeIdentifier LOCATION_NODEID = NodeIdentifier.create(LOCATION_QNAME);
144 // private static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams";
145 // private static final String STREAM_PATH_PART = "/stream=";
146 // private static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART;
147 // private static final String STREAM_ACCESS_PATH_PART = "/access=";
148 // private static final String STREAM_LOCATION_PATH_PART = "/location";
150 // private final ListenersBroker listenersBroker;
151 // private final HandlersHolder handlersHolder;
153 // // FIXME: NETCONF:1102: do not instantiate this service
154 // new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
158 // * Initialize holder of handlers with holders as parameters.
160 // * @param dataBroker {@link DOMDataBroker}
161 // * @param notificationService {@link DOMNotificationService}
162 // * @param databindProvider a {@link DatabindProvider}
163 // * @param listenersBroker a {@link ListenersBroker}
165 // public RestconfStreamsSubscriptionServiceImpl(final DOMDataBroker dataBroker,
166 // final DOMNotificationService notificationService, final DatabindProvider databindProvider,
167 // final ListenersBroker listenersBroker) {
168 // handlersHolder = new HandlersHolder(dataBroker, notificationService, databindProvider);
169 // this.listenersBroker = requireNonNull(listenersBroker);
173 // public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
174 // final var params = QueryParams.newReceiveEventsParams(uriInfo);
176 // final URI location;
177 // if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
178 // location = listenersBroker.subscribeToDataStream(identifier, uriInfo, params, handlersHolder);
179 // } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
180 // location = listenersBroker.subscribeToYangStream(identifier, uriInfo, params, handlersHolder);
182 // final String msg = "Bad type of notification of sal-remote";
184 // throw new RestconfDocumentedException(msg);
187 // return Response.ok()
188 // .location(location)
189 // .entity(new NormalizedNodePayload(
190 // Inference.ofDataTreePath(handlersHolder.databindProvider().currentContext().modelContext(),
191 // Notifi.QNAME, LOCATION_QNAME),
192 // ImmutableNodes.leafNode(LOCATION_NODEID, location.toString())))
196 private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
198 private static final QName DATASTORE_QNAME =
199 QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern();
200 private static final QName OUTPUT_TYPE_QNAME =
201 QName.create(NotificationOutputTypeGrouping.QNAME, "notification-output-type").intern();
202 private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
203 QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern();
204 private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
205 QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
206 private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
207 private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
208 private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
209 NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
210 private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
211 NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
212 private static final NodeIdentifier NOTIFICATIONS =
213 NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern());
214 private static final NodeIdentifier PATH_NODEID =
215 NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
216 private static final NodeIdentifier STREAM_NAME_NODEID =
217 NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
219 private final ConcurrentMap<String, AbstractStream<?>> streams = new ConcurrentHashMap<>();
220 private final DOMDataBroker dataBroker;
222 private ListenersBroker(final DOMDataBroker dataBroker) {
223 this.dataBroker = requireNonNull(dataBroker);
227 * Get an {@link AbstractStream} by its name.
229 * @param streamName Stream name.
230 * @return An {@link AbstractStream}, or {@code null} if the stream with specified name does not exist.
231 * @throws NullPointerException if {@code streamName} is {@code null}
233 public final @Nullable AbstractStream<?> getStream(final String streamName) {
234 return streams.get(streamName);
238 * Create an {@link AbstractStream} with a unique name. This method will atomically generate a stream name, create
239 * the corresponding instance and register it
241 * @param <T> Stream type
242 * @param factory Factory for creating the actual stream instance
243 * @return An {@link AbstractStream} instance
244 * @throws NullPointerException if {@code factory} is {@code null}
246 public final <T extends AbstractStream<?>> @NonNull T createStream(final StreamFactory<T> factory) {
250 // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
251 // it into UUID URN namespace as defined by RFC4122
252 name = "urn:uuid:" + UUID.randomUUID().toString();
253 stream = factory.createStream(name);
254 } while (streams.putIfAbsent(name, stream) != null);
260 * Remove a particular stream and remove its entry from operational datastore.
262 * @param stream Stream to remove
264 final void removeStream(final AbstractStream<?> stream) {
265 // Defensive check to see if we are still tracking the stream
266 final var streamName = stream.getStreamName();
267 if (streams.get(streamName) != stream) {
268 LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", streamName, stream);
272 // Now issue a delete operation while the name is still protected by being associated in the map.
273 final var tx = dataBroker.newWriteOnlyTransaction();
274 tx.delete(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(streamName));
275 tx.commit().addCallback(new FutureCallback<CommitInfo>() {
277 public void onSuccess(final CommitInfo result) {
278 LOG.debug("Stream {} removed", streamName);
279 streams.remove(streamName, stream);
283 public void onFailure(final Throwable cause) {
284 LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", streamName, cause);
285 streams.remove(streamName, stream);
287 }, MoreExecutors.directExecutor());
291 * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
292 * and optionally {@link URLConstants#BASE_PATH} prefix.
294 * @param uri URI for creation of stream name.
295 * @return String representation of stream name.
297 // private static String createStreamNameFromUri(final String uri) {
298 // String result = requireNonNull(uri);
300 // if (result.startsWith(URLConstants.BASE_PATH)) {
301 // result = result.substring(URLConstants.BASE_PATH.length());
302 // } else if (result.startsWith("/")) {
303 // result = result.substring(1);
308 // if (result.endsWith("/")) {
309 // result = result.substring(0, result.length() - 1);
315 * Prepare URL from base name and stream name.
317 * @param uriInfo base URL information
318 * @param streamName name of stream for create
321 public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
323 // FIXME: callers are utter duplicates, refactor them
324 // private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
325 // // FIXME: use put() here
326 // tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
330 // private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
332 // readWriteTransaction.commit().get();
333 // } catch (final InterruptedException | ExecutionException e) {
334 // throw new RestconfDocumentedException("Problem while putting data to DS.", e);
339 * Create data-change-event stream with POST operation via RPC.
341 * @param input Input of RPC - example in JSON (data-change-event stream):
346 * "path": "/toaster:toaster/toaster:toasterStatus",
347 * "sal-remote-augment:datastore": "OPERATIONAL",
348 * "sal-remote-augment:scope": "ONE"
353 * @param modelContext Reference to {@link EffectiveModelContext}.
354 * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
359 * "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
365 // FIXME: this really should be a normal RPC implementation
366 public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(final ContainerNode input,
367 final EffectiveModelContext modelContext) {
368 final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
369 final var datastore = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
370 : LogicalDatastoreType.CONFIGURATION;
371 final var path = preparePath(input);
372 final var outputType = prepareOutputType(input);
373 final var adapter = createStream(name -> new ListenerAdapter(name, outputType, this, datastore, path));
375 // building of output
376 return RestconfFuture.of(Optional.of(Builders.containerBuilder()
377 .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
378 .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
383 // * Register listener by streamName in identifier to listen to data change notifications, and put or delete
384 // * information about listener to DS according to ietf-restconf-monitoring.
386 // * @param identifier Identifier as stream name.
387 // * @param uriInfo Base URI information.
388 // * @param notificationQueryParams Query parameters of notification.
389 // * @param handlersHolder Holder of handlers for notifications.
390 // * @return Location for listening.
392 // public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
393 // final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
394 // final var streamName = createStreamNameFromUri(identifier);
395 // final var listener = dataChangeListenerFor(streamName);
396 // if (listener == null) {
397 // throw new RestconfDocumentedException("No listener found for stream " + streamName,
398 // ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
401 // listener.setQueryParams(notificationQueryParams);
403 // final var dataBroker = handlersHolder.dataBroker();
404 // final var schemaHandler = handlersHolder.databindProvider();
405 // listener.setCloseVars(schemaHandler);
406 // listener.listen(dataBroker);
408 // final var uri = prepareUriByStreamName(uriInfo, streamName);
409 // final var schemaContext = schemaHandler.currentContext().modelContext();
410 // final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
412 // final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
413 // listener.getOutputType(), uri, schemaContext, serializedPath);
414 // final var writeTransaction = dataBroker.newWriteOnlyTransaction();
415 // writeDataToDS(writeTransaction, mapToStreams);
416 // submitData(writeTransaction);
420 // FIXME: this really should be a normal RPC implementation
421 public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(final ContainerNode input,
422 final EffectiveModelContext modelContext) {
423 final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
424 .map(LeafSetEntryNode::body)
427 .collect(ImmutableSet.toImmutableSet());
429 for (var qname : qnames) {
430 if (modelContext.findNotification(qname).isEmpty()) {
431 throw new RestconfDocumentedException(qname + " refers to an unknown notification",
432 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
436 // FIXME: use this block to create a stream description
437 // final var module = refSchemaCtx.findModuleStatement(qname.getModule())
438 // .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
439 // ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
440 // final var stmt = module.findSchemaTreeNode(qname)
441 // .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
442 // ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
443 // if (!(stmt instanceof NotificationEffectiveStatement)) {
444 // throw new RestconfDocumentedException(qname + " refers to a non-notification",
445 // ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
453 // sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
455 // registration of the listener
456 final var outputType = prepareOutputType(input);
457 final var adapter = createStream(name -> new NotificationListenerAdapter(name, outputType, this, qnames));
459 return RestconfFuture.of(Optional.of(Builders.containerBuilder()
460 .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
461 .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
466 * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
467 * about listener to DS according to ietf-restconf-monitoring.
469 * @param identifier Name of the stream.
470 * @param uriInfo URI information.
471 * @param notificationQueryParams Query parameters of notification.
472 * @param handlersHolder Holder of handlers for notifications.
473 * @return Stream location for listening.
475 // public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
476 // final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
477 // final String streamName = createStreamNameFromUri(identifier);
478 // if (isNullOrEmpty(streamName)) {
479 // throw new RestconfDocumentedException("Stream name is empty", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
482 // final var notificationListenerAdapter = notificationListenerFor(streamName);
483 // if (notificationListenerAdapter == null) {
484 // throw new RestconfDocumentedException("Stream with name %s was not found".formatted(streamName),
485 // ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
488 // final URI uri = prepareUriByStreamName(uriInfo, streamName);
489 // notificationListenerAdapter.setQueryParams(notificationQueryParams);
490 // notificationListenerAdapter.listen(handlersHolder.notificationService());
491 // final DOMDataBroker dataBroker = handlersHolder.dataBroker();
492 // notificationListenerAdapter.setCloseVars(handlersHolder.databindProvider());
493 // final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
494 // notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
496 // // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
497 // final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
498 // writeDataToDS(writeTransaction, mapToStreams);
499 // submitData(writeTransaction);
504 * Create device notification stream.
506 * @param baseUrl base Url
507 * @param input RPC input
508 * @param mountPointService dom mount point service
509 * @return {@link DOMRpcResult} - Output of RPC - example in JSON
511 // FIXME: this should be an RPC invocation
512 public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final ContainerNode input,
513 final String baseUrl, final DOMMountPointService mountPointService) {
514 // parsing out of container with settings and path
516 final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
517 .map(DataContainerChild::body)
518 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
519 ErrorTag.DATA_MISSING));
521 if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
522 throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
523 ErrorTag.INVALID_VALUE);
525 if (listId.size() != 1) {
526 throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
527 ErrorTag.INVALID_VALUE);
530 final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
531 .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
532 ErrorTag.OPERATION_FAILED));
534 final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
535 .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
536 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
538 final var mountModelContext = mountPoint.getService(DOMSchemaService.class)
539 .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
540 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
542 final var notificationPaths = mountModelContext.getModuleStatements().values().stream()
543 .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
544 .map(notification -> Absolute.of(notification.argument()))
545 .collect(ImmutableSet.toImmutableSet());
546 if (notificationPaths.isEmpty()) {
547 throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
548 ErrorTag.OPERATION_FAILED);
551 // FIXME: use this for description?
552 // final String deviceName = listId.values().iterator().next().toString();
554 final var outputType = prepareOutputType(input);
555 final var notificationListenerAdapter = createStream(
556 streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, this, mountModelContext,
557 mountPointService, mountPoint.getIdentifier()));
558 notificationListenerAdapter.listen(mountNotifService, notificationPaths);
560 return RestconfFuture.of(Optional.of(Builders.containerBuilder()
561 .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
562 .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
563 baseUrl + notificationListenerAdapter.getStreamName()))
568 * Prepare {@link NotificationOutputType}.
570 * @param data Container with stream settings (RPC create-stream).
571 * @return Parsed {@link NotificationOutputType}.
573 private static NotificationOutputType prepareOutputType(final ContainerNode data) {
574 final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
575 return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
579 * Prepare {@link YangInstanceIdentifier} of stream source.
581 * @param data Container with stream settings (RPC create-stream).
582 * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
583 * are going to be generated.
585 private static YangInstanceIdentifier preparePath(final ContainerNode data) {
586 final var pathLeaf = data.childByArg(PATH_NODEID);
587 if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
591 throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
592 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
595 private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
596 return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str