283fe5ca346f208c136636dd09776eb665b434e7
[netconf.git] /
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static com.google.common.base.Strings.isNullOrEmpty;
11 import static java.util.Objects.requireNonNull;
12
13 import com.google.common.collect.BiMap;
14 import com.google.common.collect.HashBiMap;
15 import com.google.common.collect.ImmutableSet;
16 import java.net.URI;
17 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.locks.StampedLock;
20 import javax.ws.rs.core.UriInfo;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
24 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
28 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
29 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
30 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
31 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
32 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
33 import org.opendaylight.restconf.common.errors.RestconfFuture;
34 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
35 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
36 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
37 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
43 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
44 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
45 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
46 import org.opendaylight.yangtools.yang.common.ErrorTag;
47 import org.opendaylight.yangtools.yang.common.ErrorType;
48 import org.opendaylight.yangtools.yang.common.QName;
49 import org.opendaylight.yangtools.yang.common.QNameModule;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
53 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
54 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
55 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
56 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
57 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
59 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
60 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
61 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
62 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
63 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
64 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 /**
69  * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
70  * {@link NotificationListenerAdapter} listeners.
71  */
72 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
73 //        names. We essentially need a component which deals with allocation of stream names and their lifecycle and
74 //        the contents of /restconf-state/streams.
75 public abstract sealed class ListenersBroker {
76     /**
77      * A ListenersBroker working with Server-Sent Events.
78      */
79     public static final class ServerSentEvents extends ListenersBroker {
80         @Override
81         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
82             return uriInfo.getBaseUriBuilder()
83                 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
84                 .build();
85         }
86     }
87
88     /**
89      * A ListenersBroker working with WebSockets.
90      */
91     public static final class WebSockets extends ListenersBroker {
92         @Override
93         public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
94             final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
95                 // Secured HTTP goes to Secured WebSockets
96                 case "https" -> "wss";
97                 // Unsecured HTTP and others go to unsecured WebSockets
98                 default -> "ws";
99             };
100
101             return uriInfo.getBaseUriBuilder()
102                 .scheme(scheme)
103                 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
104                 .build();
105         }
106     }
107
108     /**
109      * Holder of all handlers for notifications.
110      */
111     // FIXME: why do we even need this class?!
112     private record HandlersHolder(
113             @NonNull DOMDataBroker dataBroker,
114             @NonNull DOMNotificationService notificationService,
115             @NonNull DatabindProvider databindProvider) {
116
117         HandlersHolder {
118             requireNonNull(dataBroker);
119             requireNonNull(notificationService);
120             requireNonNull(databindProvider);
121         }
122     }
123
124 //    private static final QName LOCATION_QNAME = QName.create(Notifi.QNAME, "location").intern();
125 //    private static final NodeIdentifier LOCATION_NODEID = NodeIdentifier.create(LOCATION_QNAME);
126 //    private static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams";
127 //    private static final String STREAM_PATH_PART = "/stream=";
128 //    private static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART;
129 //    private static final String STREAM_ACCESS_PATH_PART = "/access=";
130 //    private static final String STREAM_LOCATION_PATH_PART = "/location";
131 //
132 //    private final ListenersBroker listenersBroker;
133 //    private final HandlersHolder handlersHolder;
134 //
135 //  // FIXME: NETCONF:1102: do not instantiate this service
136 //  new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
137 //      listenersBroker),
138 //
139 //    /**
140 //     * Initialize holder of handlers with holders as parameters.
141 //     *
142 //     * @param dataBroker {@link DOMDataBroker}
143 //     * @param notificationService {@link DOMNotificationService}
144 //     * @param databindProvider a {@link DatabindProvider}
145 //     * @param listenersBroker a {@link ListenersBroker}
146 //     */
147 //    public RestconfStreamsSubscriptionServiceImpl(final DOMDataBroker dataBroker,
148 //            final DOMNotificationService notificationService, final DatabindProvider databindProvider,
149 //            final ListenersBroker listenersBroker) {
150 //        handlersHolder = new HandlersHolder(dataBroker, notificationService, databindProvider);
151 //        this.listenersBroker = requireNonNull(listenersBroker);
152 //    }
153 //
154 //    @Override
155 //    public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
156 //        final var params = QueryParams.newReceiveEventsParams(uriInfo);
157 //
158 //        final URI location;
159 //        if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
160 //            location = listenersBroker.subscribeToDataStream(identifier, uriInfo, params, handlersHolder);
161 //        } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
162 //            location = listenersBroker.subscribeToYangStream(identifier, uriInfo, params, handlersHolder);
163 //        } else {
164 //            final String msg = "Bad type of notification of sal-remote";
165 //            LOG.warn(msg);
166 //            throw new RestconfDocumentedException(msg);
167 //        }
168 //
169 //        return Response.ok()
170 //            .location(location)
171 //            .entity(new NormalizedNodePayload(
172 //                Inference.ofDataTreePath(handlersHolder.databindProvider().currentContext().modelContext(),
173 //                    Notifi.QNAME, LOCATION_QNAME),
174 //                ImmutableNodes.leafNode(LOCATION_NODEID, location.toString())))
175 //            .build();
176 //    }
177
178     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
179
180     // Prefixes for stream names
181     private static final String DATA_SUBSCRIPTION = "data-change-event-subscription";
182     private static final String NOTIFICATION_STREAM = "notification-stream";
183     private static final String DEVICE_NOTIFICATION_STREAM = "device-notification-stream";
184
185     private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
186
187     private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
188     private static final QName DATASTORE_QNAME =
189         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
190     private static final QName SCOPE_QNAME =
191         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
192     private static final QName OUTPUT_TYPE_QNAME =
193         QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
194     private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
195         QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
196     private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
197         QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
198     private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
199     private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
200     private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
201     private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
202         NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
203     private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
204         NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
205     private static final NodeIdentifier NOTIFICATIONS =
206         NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern());
207     private static final NodeIdentifier PATH_NODEID =
208         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
209     private static final NodeIdentifier STREAM_NAME_NODEID =
210         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
211
212     private final StampedLock dataChangeListenersLock = new StampedLock();
213     private final StampedLock notificationListenersLock = new StampedLock();
214     private final StampedLock deviceNotificationListenersLock = new StampedLock();
215     private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
216     private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
217     private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
218
219     private ListenersBroker() {
220         // Hidden on purpose
221     }
222
223     /**
224      * Gets {@link ListenerAdapter} specified by stream identification.
225      *
226      * @param streamName Stream name.
227      * @return {@link ListenerAdapter} specified by stream name or {@code null} if listener with specified stream name
228      *         does not exist.
229      * @throws NullPointerException in {@code streamName} is {@code null}
230      */
231     public final @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
232         requireNonNull(streamName);
233
234         final long stamp = dataChangeListenersLock.readLock();
235         try {
236             return dataChangeListeners.get(streamName);
237         } finally {
238             dataChangeListenersLock.unlockRead(stamp);
239         }
240     }
241
242     /**
243      * Gets {@link NotificationListenerAdapter} specified by stream name.
244      *
245      * @param streamName Stream name.
246      * @return {@link NotificationListenerAdapter} specified by stream name or {@code null} if listener with specified
247      *         stream name does not exist.
248      * @throws NullPointerException in {@code streamName} is {@code null}
249      */
250     public final @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
251         requireNonNull(streamName);
252
253         final long stamp = notificationListenersLock.readLock();
254         try {
255             return notificationListeners.get(streamName);
256         } finally {
257             notificationListenersLock.unlockRead(stamp);
258         }
259     }
260
261     /**
262      * Get listener for device path.
263      *
264      * @param streamName name.
265      * @return {@link DeviceNotificationListenerAdaptor} specified by stream name or {@code null} if listener with
266      *         specified stream name does not exist.
267      * @throws NullPointerException in {@code path} is {@code null}
268      */
269     public final @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
270         requireNonNull(streamName);
271
272         final long stamp = deviceNotificationListenersLock.readLock();
273         try {
274             return deviceNotificationListeners.get(streamName);
275         } finally {
276             deviceNotificationListenersLock.unlockRead(stamp);
277         }
278     }
279
280     /**
281      * Get listener for stream-name.
282      *
283      * @param streamName Stream name.
284      * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
285      *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
286      */
287     public final @Nullable AbstractStream<?> listenerFor(final String streamName) {
288         if (streamName.startsWith(NOTIFICATION_STREAM)) {
289             return notificationListenerFor(streamName);
290         } else if (streamName.startsWith(DATA_SUBSCRIPTION)) {
291             return dataChangeListenerFor(streamName);
292         } else if (streamName.startsWith(DEVICE_NOTIFICATION_STREAM)) {
293             return deviceNotificationListenerFor(streamName);
294         } else {
295             return null;
296         }
297     }
298
299     /**
300      * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
301      * hasn't been created yet.
302      *
303      * @param path       Path to data in data repository.
304      * @param outputType Specific type of output for notifications - XML or JSON.
305      * @return Created or existing data-change listener adapter.
306      */
307     public final ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
308             final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final Scope scope,
309             final NotificationOutputType outputType) {
310         final var sb = new StringBuilder(DATA_SUBSCRIPTION)
311             .append('/').append(createStreamNameFromUri(IdentifierCodec.serialize(path, modelContext)))
312             .append('/').append(RestconfStreamsConstants.DATASTORE_PARAM_NAME).append('=').append(datastore)
313             .append('/').append(RestconfStreamsConstants.SCOPE_PARAM_NAME).append('=').append(scope);
314         if (outputType != NotificationOutputType.XML) {
315             sb.append('/').append(outputType.getName());
316         }
317
318         final long stamp = dataChangeListenersLock.writeLock();
319         try {
320             return dataChangeListeners.computeIfAbsent(sb.toString(),
321                 streamName -> new ListenerAdapter(streamName, outputType, this, datastore, path));
322         } finally {
323             dataChangeListenersLock.unlockWrite(stamp);
324         }
325     }
326
327     /**
328      * Creates new {@link NotificationDefinition} listener using input stream name and schema path
329      * if such listener haven't been created yet.
330      *
331      * @param refSchemaCtx reference {@link EffectiveModelContext}
332      * @param notifications {@link QName}s of accepted YANG notifications
333      * @param outputType Specific type of output for notifications - XML or JSON.
334      * @return Created or existing notification listener adapter.
335      */
336     public final NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
337             final ImmutableSet<QName> notifications, final NotificationOutputType outputType) {
338         final var sb = new StringBuilder(NOTIFICATION_STREAM).append('/');
339         var haveFirst = false;
340         for (var qname : notifications) {
341             final var module = refSchemaCtx.findModuleStatement(qname.getModule())
342                 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
343                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
344             final var stmt = module.findSchemaTreeNode(qname)
345                 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
346                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
347             if (!(stmt instanceof NotificationEffectiveStatement)) {
348                 throw new RestconfDocumentedException(qname + " refers to a non-notification",
349                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
350             }
351
352             if (haveFirst) {
353                 sb.append(',');
354             } else {
355                 haveFirst = true;
356             }
357             sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
358         }
359         if (outputType != NotificationOutputType.XML) {
360             sb.append('/').append(outputType.getName());
361         }
362
363         final long stamp = notificationListenersLock.writeLock();
364         try {
365             return notificationListeners.computeIfAbsent(sb.toString(),
366                 streamName -> new NotificationListenerAdapter(streamName, outputType, this, notifications));
367         } finally {
368             notificationListenersLock.unlockWrite(stamp);
369         }
370     }
371
372     /**
373      * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
374      * if such listener haven't been created yet.
375      *
376      * @param deviceName Device name.
377      * @param outputType Specific type of output for notifications - XML or JSON.
378      * @param refSchemaCtx Schema context of node
379      * @param mountPointService Mount point service
380      * @return Created or existing device notification listener adapter.
381      */
382     private DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
383             final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
384             final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
385         final var sb = new StringBuilder(DEVICE_NOTIFICATION_STREAM).append('/')
386             .append(deviceName);
387
388         final long stamp = deviceNotificationListenersLock.writeLock();
389         try {
390             return deviceNotificationListeners.computeIfAbsent(sb.toString(),
391                 streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, this, refSchemaCtx,
392                     mountPointService, path));
393         } finally {
394             deviceNotificationListenersLock.unlockWrite(stamp);
395         }
396     }
397
398     /**
399      * Removal and closing of all data-change-event and notification listeners.
400      */
401     public final synchronized void removeAndCloseAllListeners() {
402         final long stampNotifications = notificationListenersLock.writeLock();
403         final long stampDataChanges = dataChangeListenersLock.writeLock();
404         try {
405             removeAndCloseAllDataChangeListenersTemplate();
406             removeAndCloseAllNotificationListenersTemplate();
407         } finally {
408             dataChangeListenersLock.unlockWrite(stampDataChanges);
409             notificationListenersLock.unlockWrite(stampNotifications);
410         }
411     }
412
413     /**
414      * Closes and removes all data-change listeners.
415      */
416     public final void removeAndCloseAllDataChangeListeners() {
417         final long stamp = dataChangeListenersLock.writeLock();
418         try {
419             removeAndCloseAllDataChangeListenersTemplate();
420         } finally {
421             dataChangeListenersLock.unlockWrite(stamp);
422         }
423     }
424
425     @SuppressWarnings("checkstyle:IllegalCatch")
426     private void removeAndCloseAllDataChangeListenersTemplate() {
427         dataChangeListeners.values().forEach(listenerAdapter -> {
428             try {
429                 listenerAdapter.close();
430             } catch (Exception e) {
431                 LOG.error("Failed to close data-change listener {}.", listenerAdapter, e);
432                 throw new IllegalStateException("Failed to close data-change listener %s.".formatted(listenerAdapter),
433                     e);
434             }
435         });
436         dataChangeListeners.clear();
437     }
438
439     /**
440      * Closes and removes all notification listeners.
441      */
442     public final void removeAndCloseAllNotificationListeners() {
443         final long stamp = notificationListenersLock.writeLock();
444         try {
445             removeAndCloseAllNotificationListenersTemplate();
446         } finally {
447             notificationListenersLock.unlockWrite(stamp);
448         }
449     }
450
451     @SuppressWarnings("checkstyle:IllegalCatch")
452     private void removeAndCloseAllNotificationListenersTemplate() {
453         notificationListeners.values().forEach(listenerAdapter -> {
454             try {
455                 listenerAdapter.close();
456             } catch (Exception e) {
457                 LOG.error("Failed to close notification listener {}.", listenerAdapter, e);
458                 throw new IllegalStateException("Failed to close notification listener %s.".formatted(listenerAdapter),
459                     e);
460             }
461         });
462         notificationListeners.clear();
463     }
464
465     /**
466      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
467      *
468      * @param listener Listener to be closed and removed.
469      */
470     @SuppressWarnings("checkstyle:IllegalCatch")
471     public final void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
472         final long stamp = dataChangeListenersLock.writeLock();
473         try {
474             removeAndCloseDataChangeListenerTemplate(listener);
475         } catch (Exception exception) {
476             LOG.error("Data-change listener {} cannot be closed.", listener, exception);
477         } finally {
478             dataChangeListenersLock.unlockWrite(stamp);
479         }
480     }
481
482     /**
483      * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
484      *
485      * @param listener Listener to be closed and removed.
486      */
487     private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
488         try {
489             requireNonNull(listener).close();
490             if (dataChangeListeners.inverse().remove(listener) == null) {
491                 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
492             }
493         } catch (InterruptedException | ExecutionException e) {
494             LOG.error("Data-change listener {} cannot be closed.", listener, e);
495             throw new IllegalStateException("Data-change listener %s cannot be closed.".formatted(listener), e);
496         }
497     }
498
499     /**
500      * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
501      *
502      * @param listener Listener to be closed and removed.
503      */
504     @SuppressWarnings("checkstyle:IllegalCatch")
505     public final void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
506         final long stamp = notificationListenersLock.writeLock();
507         try {
508             removeAndCloseNotificationListenerTemplate(listener);
509         } catch (Exception e) {
510             LOG.error("Notification listener {} cannot be closed.", listener, e);
511         } finally {
512             notificationListenersLock.unlockWrite(stamp);
513         }
514     }
515
516     /**
517      * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
518      * specified in parameter.
519      *
520      * @param listener Listener to be closed and removed.
521      */
522     @SuppressWarnings("checkstyle:IllegalCatch")
523     public final void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
524         final long stamp = deviceNotificationListenersLock.writeLock();
525         try {
526             requireNonNull(listener);
527             if (deviceNotificationListeners.inverse().remove(listener) == null) {
528                 LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
529             }
530         } catch (final Exception exception) {
531             LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
532         } finally {
533             deviceNotificationListenersLock.unlockWrite(stamp);
534         }
535     }
536
537     private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
538         try {
539             requireNonNull(listener).close();
540             if (notificationListeners.inverse().remove(listener) == null) {
541                 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
542             }
543         } catch (InterruptedException | ExecutionException e) {
544             LOG.error("Notification listener {} cannot be closed.", listener, e);
545             throw new IllegalStateException("Notification listener %s cannot be closed.".formatted(listener), e);
546         }
547     }
548
549     /**
550      * Removal and closing of general listener (data-change or notification listener).
551      *
552      * @param stream Stream to be closed and removed from cache.
553      */
554     final void removeAndCloseListener(final AbstractStream<?> stream) {
555         requireNonNull(stream);
556         if (stream instanceof ListenerAdapter dataChange) {
557             removeAndCloseDataChangeListener(dataChange);
558         } else if (stream instanceof NotificationListenerAdapter notification) {
559             removeAndCloseNotificationListener(notification);
560         }
561     }
562
563     /**
564      * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
565      * and optionally {@link URLConstants#BASE_PATH} prefix.
566      *
567      * @param uri URI for creation of stream name.
568      * @return String representation of stream name.
569      */
570     private static String createStreamNameFromUri(final String uri) {
571         String result = requireNonNull(uri);
572         while (true) {
573             if (result.startsWith(URLConstants.BASE_PATH)) {
574                 result = result.substring(URLConstants.BASE_PATH.length());
575             } else if (result.startsWith("/")) {
576                 result = result.substring(1);
577             } else {
578                 break;
579             }
580         }
581         if (result.endsWith("/")) {
582             result = result.substring(0, result.length() - 1);
583         }
584         return result;
585     }
586
587     /**
588      * Prepare URL from base name and stream name.
589      *
590      * @param uriInfo base URL information
591      * @param streamName name of stream for create
592      * @return final URL
593      */
594     public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
595
596     /**
597      * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
598      * about listener to DS according to ietf-restconf-monitoring.
599      *
600      * @param identifier              Name of the stream.
601      * @param uriInfo                 URI information.
602      * @param notificationQueryParams Query parameters of notification.
603      * @param handlersHolder          Holder of handlers for notifications.
604      * @return Stream location for listening.
605      */
606     public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
607             final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
608         final String streamName = createStreamNameFromUri(identifier);
609         if (isNullOrEmpty(streamName)) {
610             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
611         }
612
613         final var notificationListenerAdapter = notificationListenerFor(streamName);
614         if (notificationListenerAdapter == null) {
615             throw new RestconfDocumentedException("Stream with name %s was not found.".formatted(streamName),
616                 ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
617         }
618
619         final URI uri = prepareUriByStreamName(uriInfo, streamName);
620         notificationListenerAdapter.setQueryParams(notificationQueryParams);
621         notificationListenerAdapter.listen(handlersHolder.notificationService());
622         final DOMDataBroker dataBroker = handlersHolder.dataBroker();
623         notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
624         final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
625             notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
626
627         // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
628         final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
629         writeDataToDS(writeTransaction, mapToStreams);
630         submitData(writeTransaction);
631         return uri;
632     }
633
634     /**
635      * Register listener by streamName in identifier to listen to data change notifications, and put or delete
636      * information about listener to DS according to ietf-restconf-monitoring.
637      *
638      * @param identifier              Identifier as stream name.
639      * @param uriInfo                 Base URI information.
640      * @param notificationQueryParams Query parameters of notification.
641      * @param handlersHolder          Holder of handlers for notifications.
642      * @return Location for listening.
643      */
644     public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
645             final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
646         final var streamName = createStreamNameFromUri(identifier);
647         final var listener = dataChangeListenerFor(streamName);
648         if (listener == null) {
649             throw new RestconfDocumentedException("No listener found for stream " + streamName,
650                 ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
651         }
652
653         listener.setQueryParams(notificationQueryParams);
654
655         final var dataBroker = handlersHolder.dataBroker();
656         final var schemaHandler = handlersHolder.databindProvider();
657         listener.setCloseVars(dataBroker, schemaHandler);
658         listener.listen(dataBroker);
659
660         final var uri = prepareUriByStreamName(uriInfo, streamName);
661         final var schemaContext = schemaHandler.currentContext().modelContext();
662         final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
663
664         final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
665                 listener.getOutputType(), uri, schemaContext, serializedPath);
666         final var writeTransaction = dataBroker.newWriteOnlyTransaction();
667         writeDataToDS(writeTransaction, mapToStreams);
668         submitData(writeTransaction);
669         return uri;
670     }
671
672     // FIXME: callers are utter duplicates, refactor them
673     private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
674         // FIXME: use put() here
675         tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
676             mapToStreams);
677     }
678
679     private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
680         try {
681             readWriteTransaction.commit().get();
682         } catch (final InterruptedException | ExecutionException e) {
683             throw new RestconfDocumentedException("Problem while putting data to DS.", e);
684         }
685     }
686
687
688     /**
689      * Create data-change-event stream with POST operation via RPC.
690      *
691      * @param input Input of RPC - example in JSON (data-change-event stream):
692      *              <pre>
693      *              {@code
694      *                  {
695      *                      "input": {
696      *                          "path": "/toaster:toaster/toaster:toasterStatus",
697      *                          "sal-remote-augment:datastore": "OPERATIONAL",
698      *                          "sal-remote-augment:scope": "ONE"
699      *                      }
700      *                  }
701      *              }
702      *              </pre>
703      * @param modelContext Reference to {@link EffectiveModelContext}.
704      * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
705      *     <pre>
706      *     {@code
707      *         {
708      *             "output": {
709      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
710      *             }
711      *         }
712      *     }
713      *     </pre>
714      */
715     // FIXME: this really should be a normal RPC implementation
716     public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(final ContainerNode input,
717             final EffectiveModelContext modelContext) {
718         final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
719         final var scopeName = extractStringLeaf(input, SCOPE_NODEID);
720         final var adapter = registerDataChangeListener(modelContext,
721             datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName) : LogicalDatastoreType.CONFIGURATION,
722             preparePath(input), scopeName != null ? Scope.ofName(scopeName) : Scope.BASE, prepareOutputType(input));
723
724         // building of output
725         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
726             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
727             .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
728             .build()));
729     }
730
731     // FIXME: this really should be a normal RPC implementation
732     public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(final ContainerNode input,
733             final EffectiveModelContext modelContext) {
734         final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
735             .map(LeafSetEntryNode::body)
736             .map(QName::create)
737             .sorted()
738             .collect(ImmutableSet.toImmutableSet());
739
740         for (var qname : qnames) {
741             if (modelContext.findNotification(qname).isEmpty()) {
742                 throw new RestconfDocumentedException(qname + " refers to an unknown notification",
743                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
744             }
745         }
746
747         // registration of the listener
748         final var adapter = registerNotificationListener(modelContext, qnames, prepareOutputType(input));
749
750         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
751             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
752             .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
753             .build()));
754     }
755
756     /**
757      * Create device notification stream.
758      *
759      * @param baseUrl base Url
760      * @param input RPC input
761      * @param mountPointService dom mount point service
762      * @return {@link DOMRpcResult} - Output of RPC - example in JSON
763      */
764     // FIXME: this should be an RPC invocation
765     public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationListener(final ContainerNode input,
766             final String baseUrl, final DOMMountPointService mountPointService) {
767         // parsing out of container with settings and path
768         // FIXME: ugly cast
769         final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
770                 .map(DataContainerChild::body)
771                 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
772                     ErrorTag.DATA_MISSING));
773
774         if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
775             throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
776                 ErrorTag.INVALID_VALUE);
777         }
778         if (listId.size() != 1) {
779             throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
780                 ErrorTag.INVALID_VALUE);
781         }
782         final String deviceName = listId.values().iterator().next().toString();
783
784         final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
785             .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
786                 ErrorTag.OPERATION_FAILED));
787
788         final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
789             .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
790                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
791
792         final var mountModelContext = mountPoint.getService(DOMSchemaService.class)
793             .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
794                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
795             .getGlobalContext();
796         final var notificationPaths = mountModelContext.getModuleStatements().values().stream()
797             .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
798             .map(notification -> Absolute.of(notification.argument()))
799             .collect(ImmutableSet.toImmutableSet());
800         if (notificationPaths.isEmpty()) {
801             throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
802                 ErrorTag.OPERATION_FAILED);
803         }
804
805         final var notificationListenerAdapter = registerDeviceNotificationListener(deviceName,
806             prepareOutputType(input), mountModelContext, mountPointService, mountPoint.getIdentifier());
807         notificationListenerAdapter.listen(mountNotifService, notificationPaths);
808
809         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
810             .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
811             .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
812                 baseUrl + notificationListenerAdapter.getStreamName()))
813             .build()));
814     }
815
816     /**
817      * Prepare {@link NotificationOutputType}.
818      *
819      * @param data Container with stream settings (RPC create-stream).
820      * @return Parsed {@link NotificationOutputType}.
821      */
822     private static NotificationOutputType prepareOutputType(final ContainerNode data) {
823         final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
824         return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
825     }
826
827     /**
828      * Prepare {@link YangInstanceIdentifier} of stream source.
829      *
830      * @param data Container with stream settings (RPC create-stream).
831      * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
832      *         are going to be generated.
833      */
834     private static YangInstanceIdentifier preparePath(final ContainerNode data) {
835         final var pathLeaf = data.childByArg(PATH_NODEID);
836         if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
837             return pathValue;
838         }
839
840         throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
841             ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
842     }
843
844     private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
845         return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
846             ? str : null;
847     }
848 }