Fix JSON and XML PatchBodyWriter errors output
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / ListenersBroker.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.ImmutableSet;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.Optional;
17 import java.util.UUID;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import javax.ws.rs.core.UriInfo;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.opendaylight.mdsal.common.api.CommitInfo;
24 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
25 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
26 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
27 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
28 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
29 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
30 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
31 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
32 import org.opendaylight.restconf.common.errors.RestconfFuture;
33 import org.opendaylight.restconf.common.errors.SettableRestconfFuture;
34 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
35 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
36 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
37 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.RestconfState;
38 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.Streams;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.Stream;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
46 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.CreateDataChangeEventSubscriptionInput1;
47 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping;
48 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.NotificationOutputTypeGrouping.NotificationOutputType;
49 import org.opendaylight.yangtools.yang.common.ErrorTag;
50 import org.opendaylight.yangtools.yang.common.ErrorType;
51 import org.opendaylight.yangtools.yang.common.QName;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
55 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
56 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
57 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
60 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
62 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
63 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
64 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
65 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
66 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 /**
71  * This singleton class is responsible for creation, removal and searching for {@link DataTreeChangeStream} or
72  * {@link NotificationStream} listeners.
73  */
74 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
75 //        names. We essentially need a component which deals with allocation of stream names and their lifecycle and
76 //        the contents of /restconf-state/streams.
77 public abstract sealed class ListenersBroker {
78     /**
79      * A ListenersBroker working with Server-Sent Events.
80      */
81     public static final class ServerSentEvents extends ListenersBroker {
82         public ServerSentEvents(final DOMDataBroker dataBroker) {
83             super(dataBroker);
84         }
85
86         @Override
87         public String baseStreamLocation(final UriInfo uriInfo) {
88             return uriInfo.getBaseUriBuilder()
89                 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH)
90                 .build()
91                 .toString();
92         }
93     }
94
95     /**
96      * A ListenersBroker working with WebSockets.
97      */
98     public static final class WebSockets extends ListenersBroker {
99         public WebSockets(final DOMDataBroker dataBroker) {
100             super(dataBroker);
101         }
102
103         @Override
104         public String baseStreamLocation(final UriInfo uriInfo) {
105             final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
106                 // Secured HTTP goes to Secured WebSockets
107                 case "https" -> "wss";
108                 // Unsecured HTTP and others go to unsecured WebSockets
109                 default -> "ws";
110             };
111
112             return uriInfo.getBaseUriBuilder()
113                 .scheme(scheme)
114                 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH)
115                 .build()
116                 .toString();
117         }
118     }
119
120     /**
121      * Factory interface for creating instances of {@link RestconfStream}.
122      *
123      * @param <T> {@link RestconfStream} type
124      */
125     @FunctionalInterface
126     public interface StreamFactory<T extends RestconfStream<?>> {
127         /**
128          * Create a stream with the supplied name.
129          *
130          * @param name Stream name
131          * @return An {@link RestconfStream}
132          */
133         @NonNull T createStream(@NonNull String name);
134     }
135
136     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
137     private static final YangInstanceIdentifier RESTCONF_STATE_STREAMS = YangInstanceIdentifier.of(
138         NodeIdentifier.create(RestconfState.QNAME),
139         NodeIdentifier.create(Streams.QNAME),
140         NodeIdentifier.create(Stream.QNAME));
141
142     @VisibleForTesting
143     static final QName NAME_QNAME =  QName.create(Stream.QNAME, "name").intern();
144     @VisibleForTesting
145     static final QName DESCRIPTION_QNAME = QName.create(Stream.QNAME, "description").intern();
146     @VisibleForTesting
147     static final QName ENCODING_QNAME =  QName.create(Stream.QNAME, "encoding").intern();
148     @VisibleForTesting
149     static final QName LOCATION_QNAME =  QName.create(Stream.QNAME, "location").intern();
150
151     private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(
152         QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern());
153     @Deprecated(forRemoval = true)
154     private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(
155         QName.create(NotificationOutputTypeGrouping.QNAME, "notification-output-type").intern());
156     private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
157         NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern());
158     private static final NodeIdentifier DEVICE_NOTIFICATION_STREAM_PATH_NODEID =
159         NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "stream-path").intern());
160
161     private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
162         NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
163     private static final NodeIdentifier NOTIFICATIONS =
164         NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern());
165     private static final NodeIdentifier PATH_NODEID =
166         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
167     private static final NodeIdentifier STREAM_NAME_NODEID =
168         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
169
170     private final ConcurrentMap<String, RestconfStream<?>> streams = new ConcurrentHashMap<>();
171     private final DOMDataBroker dataBroker;
172
173     private ListenersBroker(final DOMDataBroker dataBroker) {
174         this.dataBroker = requireNonNull(dataBroker);
175     }
176
177     /**
178      * Get a {@link RestconfStream} by its name.
179      *
180      * @param streamName Stream name.
181      * @return A {@link RestconfStream}, or {@code null} if the stream with specified name does not exist.
182      * @throws NullPointerException if {@code streamName} is {@code null}
183      */
184     public final @Nullable RestconfStream<?> getStream(final String streamName) {
185         return streams.get(streamName);
186     }
187
188     /**
189      * Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name, create
190      * the corresponding instance and register it.
191      *
192      * @param <T> Stream type
193      * @param baseStreamLocation base streams location
194      * @param factory Factory for creating the actual stream instance
195      * @return A {@link RestconfStream} instance
196      * @throws NullPointerException if {@code factory} is {@code null}
197      */
198     final <T extends RestconfStream<?>> @NonNull RestconfFuture<T> createStream(final String description,
199             final String baseStreamLocation, final StreamFactory<T> factory) {
200         String name;
201         T stream;
202         do {
203             // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
204             // it into UUID URN namespace as defined by RFC4122
205             name = "urn:uuid:" + UUID.randomUUID().toString();
206             stream = factory.createStream(name);
207         } while (streams.putIfAbsent(name, stream) != null);
208
209         // final captures for use with FutureCallback
210         final var streamName = name;
211         final var finalStream = stream;
212
213         // Now issue a put operation
214         final var ret = new SettableRestconfFuture<T>();
215         final var tx = dataBroker.newWriteOnlyTransaction();
216
217         tx.put(LogicalDatastoreType.OPERATIONAL, restconfStateStreamPath(streamName),
218             streamEntry(streamName, description, baseStreamLocation + '/' + streamName, ""));
219         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
220             @Override
221             public void onSuccess(final CommitInfo result) {
222                 LOG.debug("Stream {} added", streamName);
223                 ret.set(finalStream);
224             }
225
226             @Override
227             public void onFailure(final Throwable cause) {
228                 LOG.debug("Failed to add stream {}", streamName, cause);
229                 streams.remove(streamName, finalStream);
230                 ret.setFailure(new RestconfDocumentedException("Failed to allocate stream " + streamName, cause));
231             }
232         }, MoreExecutors.directExecutor());
233         return ret;
234     }
235
236     /**
237      * Remove a particular stream and remove its entry from operational datastore.
238      *
239      * @param stream Stream to remove
240      */
241     final void removeStream(final RestconfStream<?> stream) {
242         // Defensive check to see if we are still tracking the stream
243         final var streamName = stream.name();
244         if (streams.get(streamName) != stream) {
245             LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", streamName, stream);
246             return;
247         }
248
249         // Now issue a delete operation while the name is still protected by being associated in the map.
250         final var tx = dataBroker.newWriteOnlyTransaction();
251         tx.delete(LogicalDatastoreType.OPERATIONAL, restconfStateStreamPath(streamName));
252         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
253             @Override
254             public void onSuccess(final CommitInfo result) {
255                 LOG.debug("Stream {} removed", streamName);
256                 streams.remove(streamName, stream);
257             }
258
259             @Override
260             public void onFailure(final Throwable cause) {
261                 LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", streamName, cause);
262                 streams.remove(streamName, stream);
263             }
264         }, MoreExecutors.directExecutor());
265     }
266
267     private static @NonNull YangInstanceIdentifier restconfStateStreamPath(final String streamName) {
268         return RESTCONF_STATE_STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, streamName));
269     }
270
271     /**
272      * Return the base location URL of the streams service based on request URI.
273      *
274      * @param uriInfo request URL information
275      * @return location URL
276      */
277     public abstract @NonNull String baseStreamLocation(UriInfo uriInfo);
278
279     /**
280      * Create data-change-event stream with POST operation via RPC.
281      *
282      * @param input Input of RPC - example in JSON (data-change-event stream):
283      *              <pre>
284      *              {@code
285      *                  {
286      *                      "input": {
287      *                          "path": "/toaster:toaster/toaster:toasterStatus",
288      *                          "sal-remote-augment:datastore": "OPERATIONAL",
289      *                      }
290      *                  }
291      *              }
292      *              </pre>
293      * @param modelContext Reference to {@link EffectiveModelContext}.
294      * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
295      *     <pre>
296      *     {@code
297      *         {
298      *             "output": {
299      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
300      *             }
301      *         }
302      *     }
303      *     </pre>
304      */
305     // FIXME: this really should be a normal RPC implementation
306     public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(
307             final DatabindProvider databindProvider, final UriInfo uriInfo, final ContainerNode input,
308             final EffectiveModelContext modelContext) {
309         final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
310         final var datastore = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
311             : LogicalDatastoreType.CONFIGURATION;
312         final var path = preparePath(input);
313
314         final var outputType = prepareOutputType(input);
315         return createStream(
316             "Events occuring in " + datastore + " datastore under /" + IdentifierCodec.serialize(path, modelContext),
317             baseStreamLocation(uriInfo),
318             name -> new DataTreeChangeStream(this, name, outputType, databindProvider, datastore, path))
319             .transform(stream -> Optional.of(Builders.containerBuilder()
320                 .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
321                 .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name()))
322                 .build()));
323     }
324
325 // FIXME: NETCONF-1102: this part needs to be invoked from subscriber
326 //    /**
327 //     * Register listener by streamName in identifier to listen to data change notifications, and put or delete
328 //     * information about listener to DS according to ietf-restconf-monitoring.
329 //     *
330 //     * @param identifier              Identifier as stream name.
331 //     * @param uriInfo                 Base URI information.
332 //     * @param notificationQueryParams Query parameters of notification.
333 //     * @param handlersHolder          Holder of handlers for notifications.
334 //     * @return Location for listening.
335 //     */
336 //    public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
337 //            final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
338 //        final var streamName = createStreamNameFromUri(identifier);
339 //        final var listener = dataChangeListenerFor(streamName);
340 //        if (listener == null) {
341 //            throw new RestconfDocumentedException("No listener found for stream " + streamName,
342 //                ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
343 //        }
344 //
345 //        listener.setQueryParams(notificationQueryParams);
346 //        listener.listen(dataBroker);
347 //    }
348
349     // FIXME: this really should be a normal RPC implementation
350     public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(
351             final DatabindProvider databindProvider, final UriInfo uriInfo, final ContainerNode input,
352             final EffectiveModelContext modelContext) {
353         final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
354             .map(LeafSetEntryNode::body)
355             .map(QName::create)
356             .sorted()
357             .collect(ImmutableSet.toImmutableSet());
358
359         final var description = new StringBuilder("YANG notifications matching any of {");
360         var haveFirst = false;
361         for (var qname : qnames) {
362             final var module = modelContext.findModuleStatement(qname.getModule())
363                 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
364                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
365             final var stmt = module.findSchemaTreeNode(qname)
366                 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown notification",
367                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
368             if (!(stmt instanceof NotificationEffectiveStatement)) {
369                 throw new RestconfDocumentedException(qname + " refers to a non-notification",
370                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
371             }
372
373             if (haveFirst) {
374                 description.append(",\n");
375             } else {
376                 haveFirst = true;
377             }
378             description.append("\n  ")
379                 .append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
380         }
381         description.append("\n}");
382
383         // registration of the listener
384         final var outputType = prepareOutputType(input);
385         return createStream(description.toString(), baseStreamLocation(uriInfo),
386             name -> new NotificationStream(this, name, outputType, databindProvider, qnames))
387             .transform(stream -> Optional.of(Builders.containerBuilder()
388                 .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
389                 .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name()))
390                 .build()));
391     }
392
393     /**
394      * Create device notification stream.
395      *
396      * @param input RPC input
397      * @param mountPointService dom mount point service
398      * @return {@link DOMRpcResult} - Output of RPC - example in JSON
399      */
400     // FIXME: this should be an RPC invocation
401     public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final UriInfo uriInfo,
402             final ContainerNode input, final EffectiveModelContext modelContext,
403             final DOMMountPointService mountPointService) {
404         // parsing out of container with settings and path
405         // FIXME: ugly cast
406         final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
407                 .map(DataContainerChild::body)
408                 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
409                     ErrorTag.DATA_MISSING));
410
411         if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
412             throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
413                 ErrorTag.INVALID_VALUE);
414         }
415         if (listId.size() != 1) {
416             throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
417                 ErrorTag.INVALID_VALUE);
418         }
419
420         final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
421             .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
422                 ErrorTag.OPERATION_FAILED));
423
424         final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
425             .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
426                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
427
428         final var mountModelContext = mountPoint.getService(DOMSchemaService.class)
429             .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
430                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
431             .getGlobalContext();
432         final var notificationPaths = mountModelContext.getModuleStatements().values().stream()
433             .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
434             .map(notification -> Absolute.of(notification.argument()))
435             .collect(ImmutableSet.toImmutableSet());
436         if (notificationPaths.isEmpty()) {
437             throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
438                 ErrorTag.OPERATION_FAILED);
439         }
440
441         final var baseStreamsUri = baseStreamLocation(uriInfo);
442         final var outputType = prepareOutputType(input);
443         return createStream(
444             "All YANG notifications occuring on mount point /" + IdentifierCodec.serialize(path, modelContext),
445             baseStreamsUri,
446             streamName -> new DeviceNotificationStream(this, streamName, outputType, mountModelContext,
447                 mountPointService, mountPoint.getIdentifier()))
448             .transform(stream -> {
449                 stream.listen(mountNotifService, notificationPaths);
450                 return Optional.of(Builders.containerBuilder()
451                     .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
452                     .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH_NODEID,
453                         baseStreamsUri + '/' + stream.name()))
454                     .build());
455             });
456     }
457
458     /**
459      * Prepare {@link NotificationOutputType}.
460      *
461      * @param data Container with stream settings (RPC create-stream).
462      * @return Parsed {@link NotificationOutputType}.
463      */
464     @Deprecated(forRemoval = true)
465     private static NotificationOutputType prepareOutputType(final ContainerNode data) {
466         final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
467         return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
468     }
469
470     /**
471      * Prepare {@link YangInstanceIdentifier} of stream source.
472      *
473      * @param data Container with stream settings (RPC create-stream).
474      * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
475      *         are going to be generated.
476      */
477     private static YangInstanceIdentifier preparePath(final ContainerNode data) {
478         final var pathLeaf = data.childByArg(PATH_NODEID);
479         if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
480             return pathValue;
481         }
482
483         throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
484             ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
485     }
486
487     private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
488         return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
489             ? str : null;
490     }
491
492     @VisibleForTesting
493     static @NonNull MapEntryNode streamEntry(final String name, final String description, final String location,
494             final String outputType) {
495         return Builders.mapEntryBuilder()
496             .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name))
497             .withChild(ImmutableNodes.leafNode(NAME_QNAME, name))
498             .withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, description))
499             .withChild(createAccessList(outputType, location))
500             .build();
501     }
502
503     private static MapNode createAccessList(final String outputType, final String location) {
504         return Builders.mapBuilder()
505             .withNodeIdentifier(new NodeIdentifier(Access.QNAME))
506             .withChild(Builders.mapEntryBuilder()
507                 .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, outputType))
508                 .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, outputType))
509                 .withChild(ImmutableNodes.leafNode(LOCATION_QNAME, location))
510                 .build())
511             .build();
512     }
513 }