ListenersBroker requires DOMDataBroker
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / ListenersBroker.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.ImmutableSet;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.net.URI;
16 import java.util.Optional;
17 import java.util.UUID;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import javax.ws.rs.core.UriInfo;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.opendaylight.mdsal.common.api.CommitInfo;
24 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
25 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
26 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
27 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
28 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
29 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
30 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
31 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
32 import org.opendaylight.restconf.common.errors.RestconfFuture;
33 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
34 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
40 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1;
41 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
42 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
43 import org.opendaylight.yangtools.yang.common.ErrorTag;
44 import org.opendaylight.yangtools.yang.common.ErrorType;
45 import org.opendaylight.yangtools.yang.common.QName;
46 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
49 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
50 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
51 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
52 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
53 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
54 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
55 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
56 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
57 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
58 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61
62 /**
63  * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
64  * {@link NotificationListenerAdapter} listeners.
65  */
66 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
67 //        names. We essentially need a component which deals with allocation of stream names and their lifecycle and
68 //        the contents of /restconf-state/streams.
69 public abstract sealed class ListenersBroker {
70     /**
71      * A ListenersBroker working with Server-Sent Events.
72      */
73     public static final class ServerSentEvents extends ListenersBroker {
74         public ServerSentEvents(final DOMDataBroker dataBroker) {
75             super(dataBroker);
76         }
77
78         @Override
79         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
80             return uriInfo.getBaseUriBuilder()
81                 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
82                 .build();
83         }
84     }
85
86     /**
87      * A ListenersBroker working with WebSockets.
88      */
89     public static final class WebSockets extends ListenersBroker {
90         public WebSockets(final DOMDataBroker dataBroker) {
91             super(dataBroker);
92         }
93
94         @Override
95         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
96             final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
97                 // Secured HTTP goes to Secured WebSockets
98                 case "https" -> "wss";
99                 // Unsecured HTTP and others go to unsecured WebSockets
100                 default -> "ws";
101             };
102
103             return uriInfo.getBaseUriBuilder()
104                 .scheme(scheme)
105                 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
106                 .build();
107         }
108     }
109
110     /**
111      * Factory interface for creating instances of {@link AbstractStream}.
112      *
113      * @param <T> {@link AbstractStream} type
114      */
115     @FunctionalInterface
116     public interface StreamFactory<T extends AbstractStream<?>> {
117         /**
118          * Create a stream with the supplied name.
119          *
120          * @param name Stream name
121          * @return An {@link AbstractStream}
122          */
123         @NonNull T createStream(@NonNull String name);
124     }
125
126     /**
127      * Holder of all handlers for notifications.
128      */
129     // FIXME: why do we even need this class?!
130     private record HandlersHolder(
131             @NonNull DOMDataBroker dataBroker,
132             @NonNull DOMNotificationService notificationService,
133             @NonNull DatabindProvider databindProvider) {
134
135         HandlersHolder {
136             requireNonNull(dataBroker);
137             requireNonNull(notificationService);
138             requireNonNull(databindProvider);
139         }
140     }
141
142 //    private static final QName LOCATION_QNAME = QName.create(Notifi.QNAME, "location").intern();
143 //    private static final NodeIdentifier LOCATION_NODEID = NodeIdentifier.create(LOCATION_QNAME);
144 //    private static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams";
145 //    private static final String STREAM_PATH_PART = "/stream=";
146 //    private static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART;
147 //    private static final String STREAM_ACCESS_PATH_PART = "/access=";
148 //    private static final String STREAM_LOCATION_PATH_PART = "/location";
149 //
150 //    private final ListenersBroker listenersBroker;
151 //    private final HandlersHolder handlersHolder;
152 //
153 //  // FIXME: NETCONF:1102: do not instantiate this service
154 //  new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
155 //      listenersBroker),
156 //
157 //    /**
158 //     * Initialize holder of handlers with holders as parameters.
159 //     *
160 //     * @param dataBroker {@link DOMDataBroker}
161 //     * @param notificationService {@link DOMNotificationService}
162 //     * @param databindProvider a {@link DatabindProvider}
163 //     * @param listenersBroker a {@link ListenersBroker}
164 //     */
165 //    public RestconfStreamsSubscriptionServiceImpl(final DOMDataBroker dataBroker,
166 //            final DOMNotificationService notificationService, final DatabindProvider databindProvider,
167 //            final ListenersBroker listenersBroker) {
168 //        handlersHolder = new HandlersHolder(dataBroker, notificationService, databindProvider);
169 //        this.listenersBroker = requireNonNull(listenersBroker);
170 //    }
171 //
172 //    @Override
173 //    public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
174 //        final var params = QueryParams.newReceiveEventsParams(uriInfo);
175 //
176 //        final URI location;
177 //        if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
178 //            location = listenersBroker.subscribeToDataStream(identifier, uriInfo, params, handlersHolder);
179 //        } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
180 //            location = listenersBroker.subscribeToYangStream(identifier, uriInfo, params, handlersHolder);
181 //        } else {
182 //            final String msg = "Bad type of notification of sal-remote";
183 //            LOG.warn(msg);
184 //            throw new RestconfDocumentedException(msg);
185 //        }
186 //
187 //        return Response.ok()
188 //            .location(location)
189 //            .entity(new NormalizedNodePayload(
190 //                Inference.ofDataTreePath(handlersHolder.databindProvider().currentContext().modelContext(),
191 //                    Notifi.QNAME, LOCATION_QNAME),
192 //                ImmutableNodes.leafNode(LOCATION_NODEID, location.toString())))
193 //            .build();
194 //    }
195
196     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
197
198     private static final QName DATASTORE_QNAME =
199         QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern();
200     private static final QName OUTPUT_TYPE_QNAME =
201         QName.create(NotificationOutputTypeGrouping.QNAME, "notification-output-type").intern();
202     private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
203         QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern();
204     private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
205         QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
206     private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
207     private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
208     private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
209         NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
210     private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
211         NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
212     private static final NodeIdentifier NOTIFICATIONS =
213         NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern());
214     private static final NodeIdentifier PATH_NODEID =
215         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
216     private static final NodeIdentifier STREAM_NAME_NODEID =
217         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
218
219     private final ConcurrentMap<String, AbstractStream<?>> streams = new ConcurrentHashMap<>();
220     private final DOMDataBroker dataBroker;
221
222     private ListenersBroker(final DOMDataBroker dataBroker) {
223         this.dataBroker = requireNonNull(dataBroker);
224     }
225
226     /**
227      * Get an {@link AbstractStream} by its name.
228      *
229      * @param streamName Stream name.
230      * @return An {@link AbstractStream}, or {@code null} if the stream with specified name does not exist.
231      * @throws NullPointerException if {@code streamName} is {@code null}
232      */
233     public final @Nullable AbstractStream<?> getStream(final String streamName) {
234         return streams.get(streamName);
235     }
236
237     /**
238      * Create an {@link AbstractStream} with a unique name. This method will atomically generate a stream name, create
239      * the corresponding instance and register it
240      *
241      * @param <T> Stream type
242      * @param factory Factory for creating the actual stream instance
243      * @return An {@link AbstractStream} instance
244      * @throws NullPointerException if {@code factory} is {@code null}
245      */
246     public final <T extends AbstractStream<?>> @NonNull T createStream(final StreamFactory<T> factory) {
247         String name;
248         T stream;
249         do {
250             // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
251             // it into UUID URN namespace as defined by RFC4122
252             name = "urn:uuid:" + UUID.randomUUID().toString();
253             stream = factory.createStream(name);
254         } while (streams.putIfAbsent(name, stream) != null);
255
256         return stream;
257     }
258
259     /**
260      * Remove a particular stream and remove its entry from operational datastore.
261      *
262      * @param stream Stream to remove
263      */
264     final void removeStream(final AbstractStream<?> stream) {
265         // Defensive check to see if we are still tracking the stream
266         final var streamName = stream.getStreamName();
267         if (streams.get(streamName) != stream) {
268             LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", streamName, stream);
269             return;
270         }
271
272         // Now issue a delete operation while the name is still protected by being associated in the map.
273         final var tx = dataBroker.newWriteOnlyTransaction();
274         tx.delete(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(streamName));
275         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
276             @Override
277             public void onSuccess(final CommitInfo result) {
278                 LOG.debug("Stream {} removed", streamName);
279                 streams.remove(streamName, stream);
280             }
281
282             @Override
283             public void onFailure(final Throwable cause) {
284                 LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", streamName, cause);
285                 streams.remove(streamName, stream);
286             }
287         }, MoreExecutors.directExecutor());
288     }
289
290     /**
291      * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
292      * and optionally {@link URLConstants#BASE_PATH} prefix.
293      *
294      * @param uri URI for creation of stream name.
295      * @return String representation of stream name.
296      */
297 //    private static String createStreamNameFromUri(final String uri) {
298 //        String result = requireNonNull(uri);
299 //        while (true) {
300 //            if (result.startsWith(URLConstants.BASE_PATH)) {
301 //                result = result.substring(URLConstants.BASE_PATH.length());
302 //            } else if (result.startsWith("/")) {
303 //                result = result.substring(1);
304 //            } else {
305 //                break;
306 //            }
307 //        }
308 //        if (result.endsWith("/")) {
309 //            result = result.substring(0, result.length() - 1);
310 //        }
311 //        return result;
312 //    }
313
314     /**
315      * Prepare URL from base name and stream name.
316      *
317      * @param uriInfo base URL information
318      * @param streamName name of stream for create
319      * @return final URL
320      */
321     public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
322
323     // FIXME: callers are utter duplicates, refactor them
324 //    private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
325 //        // FIXME: use put() here
326 //        tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
327 //            mapToStreams);
328 //    }
329 //
330 //    private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
331 //        try {
332 //            readWriteTransaction.commit().get();
333 //        } catch (final InterruptedException | ExecutionException e) {
334 //            throw new RestconfDocumentedException("Problem while putting data to DS.", e);
335 //        }
336 //    }
337
338     /**
339      * Create data-change-event stream with POST operation via RPC.
340      *
341      * @param input Input of RPC - example in JSON (data-change-event stream):
342      *              <pre>
343      *              {@code
344      *                  {
345      *                      "input": {
346      *                          "path": "/toaster:toaster/toaster:toasterStatus",
347      *                          "sal-remote-augment:datastore": "OPERATIONAL",
348      *                          "sal-remote-augment:scope": "ONE"
349      *                      }
350      *                  }
351      *              }
352      *              </pre>
353      * @param modelContext Reference to {@link EffectiveModelContext}.
354      * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
355      *     <pre>
356      *     {@code
357      *         {
358      *             "output": {
359      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
360      *             }
361      *         }
362      *     }
363      *     </pre>
364      */
365     // FIXME: this really should be a normal RPC implementation
366     public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(final ContainerNode input,
367             final EffectiveModelContext modelContext) {
368         final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
369         final var datastore = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
370             : LogicalDatastoreType.CONFIGURATION;
371         final var path = preparePath(input);
372         final var outputType = prepareOutputType(input);
373         final var adapter = createStream(name -> new ListenerAdapter(name, outputType, this, datastore, path));
374
375         // building of output
376         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
377             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
378             .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
379             .build()));
380     }
381
382 //    /**
383 //     * Register listener by streamName in identifier to listen to data change notifications, and put or delete
384 //     * information about listener to DS according to ietf-restconf-monitoring.
385 //     *
386 //     * @param identifier              Identifier as stream name.
387 //     * @param uriInfo                 Base URI information.
388 //     * @param notificationQueryParams Query parameters of notification.
389 //     * @param handlersHolder          Holder of handlers for notifications.
390 //     * @return Location for listening.
391 //     */
392 //    public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
393 //            final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
394 //        final var streamName = createStreamNameFromUri(identifier);
395 //        final var listener = dataChangeListenerFor(streamName);
396 //        if (listener == null) {
397 //            throw new RestconfDocumentedException("No listener found for stream " + streamName,
398 //                ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
399 //        }
400 //
401 //        listener.setQueryParams(notificationQueryParams);
402 //
403 //        final var dataBroker = handlersHolder.dataBroker();
404 //        final var schemaHandler = handlersHolder.databindProvider();
405 //        listener.setCloseVars(schemaHandler);
406 //        listener.listen(dataBroker);
407 //
408 //        final var uri = prepareUriByStreamName(uriInfo, streamName);
409 //        final var schemaContext = schemaHandler.currentContext().modelContext();
410 //        final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
411 //
412 //        final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
413 //                listener.getOutputType(), uri, schemaContext, serializedPath);
414 //        final var writeTransaction = dataBroker.newWriteOnlyTransaction();
415 //        writeDataToDS(writeTransaction, mapToStreams);
416 //        submitData(writeTransaction);
417 //        return uri;
418 //    }
419
420     // FIXME: this really should be a normal RPC implementation
421     public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(final ContainerNode input,
422             final EffectiveModelContext modelContext) {
423         final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
424             .map(LeafSetEntryNode::body)
425             .map(QName::create)
426             .sorted()
427             .collect(ImmutableSet.toImmutableSet());
428
429         for (var qname : qnames) {
430             if (modelContext.findNotification(qname).isEmpty()) {
431                 throw new RestconfDocumentedException(qname + " refers to an unknown notification",
432                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
433             }
434         }
435
436 // FIXME: use this block to create a stream description
437 //        final var module = refSchemaCtx.findModuleStatement(qname.getModule())
438 //            .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
439 //                ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
440 //        final var stmt = module.findSchemaTreeNode(qname)
441 //            .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
442 //                ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
443 //        if (!(stmt instanceof NotificationEffectiveStatement)) {
444 //            throw new RestconfDocumentedException(qname + " refers to a non-notification",
445 //                ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
446 //        }
447 //
448 //        if (haveFirst) {
449 //            sb.append(',');
450 //        } else {
451 //            haveFirst = true;
452 //        }
453 //        sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
454
455         // registration of the listener
456         final var outputType = prepareOutputType(input);
457         final var adapter = createStream(name -> new NotificationListenerAdapter(name, outputType, this, qnames));
458
459         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
460             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
461             .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
462             .build()));
463     }
464
465     /**
466      * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
467      * about listener to DS according to ietf-restconf-monitoring.
468      *
469      * @param identifier              Name of the stream.
470      * @param uriInfo                 URI information.
471      * @param notificationQueryParams Query parameters of notification.
472      * @param handlersHolder          Holder of handlers for notifications.
473      * @return Stream location for listening.
474      */
475 //    public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
476 //            final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
477 //        final String streamName = createStreamNameFromUri(identifier);
478 //        if (isNullOrEmpty(streamName)) {
479 //            throw new RestconfDocumentedException("Stream name is empty", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
480 //        }
481 //
482 //        final var notificationListenerAdapter = notificationListenerFor(streamName);
483 //        if (notificationListenerAdapter == null) {
484 //            throw new RestconfDocumentedException("Stream with name %s was not found".formatted(streamName),
485 //                ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
486 //        }
487 //
488 //        final URI uri = prepareUriByStreamName(uriInfo, streamName);
489 //        notificationListenerAdapter.setQueryParams(notificationQueryParams);
490 //        notificationListenerAdapter.listen(handlersHolder.notificationService());
491 //        final DOMDataBroker dataBroker = handlersHolder.dataBroker();
492 //        notificationListenerAdapter.setCloseVars(handlersHolder.databindProvider());
493 //        final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
494 //            notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
495 //
496 //        // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
497 //        final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
498 //        writeDataToDS(writeTransaction, mapToStreams);
499 //        submitData(writeTransaction);
500 //        return uri;
501 //    }
502
503     /**
504      * Create device notification stream.
505      *
506      * @param baseUrl base Url
507      * @param input RPC input
508      * @param mountPointService dom mount point service
509      * @return {@link DOMRpcResult} - Output of RPC - example in JSON
510      */
511     // FIXME: this should be an RPC invocation
512     public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final ContainerNode input,
513             final String baseUrl, final DOMMountPointService mountPointService) {
514         // parsing out of container with settings and path
515         // FIXME: ugly cast
516         final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
517                 .map(DataContainerChild::body)
518                 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
519                     ErrorTag.DATA_MISSING));
520
521         if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
522             throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
523                 ErrorTag.INVALID_VALUE);
524         }
525         if (listId.size() != 1) {
526             throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
527                 ErrorTag.INVALID_VALUE);
528         }
529
530         final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
531             .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
532                 ErrorTag.OPERATION_FAILED));
533
534         final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
535             .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
536                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
537
538         final var mountModelContext = mountPoint.getService(DOMSchemaService.class)
539             .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
540                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
541             .getGlobalContext();
542         final var notificationPaths = mountModelContext.getModuleStatements().values().stream()
543             .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
544             .map(notification -> Absolute.of(notification.argument()))
545             .collect(ImmutableSet.toImmutableSet());
546         if (notificationPaths.isEmpty()) {
547             throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
548                 ErrorTag.OPERATION_FAILED);
549         }
550
551 // FIXME: use this for description?
552 //        final String deviceName = listId.values().iterator().next().toString();
553
554         final var outputType = prepareOutputType(input);
555         final var notificationListenerAdapter = createStream(
556             streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, this, mountModelContext,
557                 mountPointService, mountPoint.getIdentifier()));
558         notificationListenerAdapter.listen(mountNotifService, notificationPaths);
559
560         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
561             .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
562             .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
563                 baseUrl + notificationListenerAdapter.getStreamName()))
564             .build()));
565     }
566
567     /**
568      * Prepare {@link NotificationOutputType}.
569      *
570      * @param data Container with stream settings (RPC create-stream).
571      * @return Parsed {@link NotificationOutputType}.
572      */
573     private static NotificationOutputType prepareOutputType(final ContainerNode data) {
574         final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
575         return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
576     }
577
578     /**
579      * Prepare {@link YangInstanceIdentifier} of stream source.
580      *
581      * @param data Container with stream settings (RPC create-stream).
582      * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
583      *         are going to be generated.
584      */
585     private static YangInstanceIdentifier preparePath(final ContainerNode data) {
586         final var pathLeaf = data.childByArg(PATH_NODEID);
587         if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
588             return pathValue;
589         }
590
591         throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
592             ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
593     }
594
595     private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
596         return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
597             ? str : null;
598     }
599 }