2 * Copyright © 2019 FRINX s.r.o. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static com.google.common.base.Strings.isNullOrEmpty;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.collect.BiMap;
14 import com.google.common.collect.HashBiMap;
15 import com.google.common.collect.ImmutableSet;
17 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.locks.StampedLock;
20 import javax.ws.rs.core.UriInfo;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
24 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
28 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
29 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
30 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
31 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
32 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
33 import org.opendaylight.restconf.common.errors.RestconfFuture;
34 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
35 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
36 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
37 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
43 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
44 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
45 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
46 import org.opendaylight.yangtools.yang.common.ErrorTag;
47 import org.opendaylight.yangtools.yang.common.ErrorType;
48 import org.opendaylight.yangtools.yang.common.QName;
49 import org.opendaylight.yangtools.yang.common.QNameModule;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
53 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
54 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
55 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
56 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
57 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
59 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
60 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
61 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
62 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
63 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
64 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
69 * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
70 * {@link NotificationListenerAdapter} listeners.
72 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
73 // names. We essentially need a component which deals with allocation of stream names and their lifecycle and
74 // the contents of /restconf-state/streams.
75 public abstract sealed class ListenersBroker {
77 * A ListenersBroker working with Server-Sent Events.
79 public static final class ServerSentEvents extends ListenersBroker {
81 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
82 return uriInfo.getBaseUriBuilder()
83 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
89 * A ListenersBroker working with WebSockets.
91 public static final class WebSockets extends ListenersBroker {
93 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
94 final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
95 // Secured HTTP goes to Secured WebSockets
96 case "https" -> "wss";
97 // Unsecured HTTP and others go to unsecured WebSockets
101 return uriInfo.getBaseUriBuilder()
103 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
109 * Holder of all handlers for notifications.
111 // FIXME: why do we even need this class?!
112 private record HandlersHolder(
113 @NonNull DOMDataBroker dataBroker,
114 @NonNull DOMNotificationService notificationService,
115 @NonNull DatabindProvider databindProvider) {
118 requireNonNull(dataBroker);
119 requireNonNull(notificationService);
120 requireNonNull(databindProvider);
124 // private static final QName LOCATION_QNAME = QName.create(Notifi.QNAME, "location").intern();
125 // private static final NodeIdentifier LOCATION_NODEID = NodeIdentifier.create(LOCATION_QNAME);
126 // private static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams";
127 // private static final String STREAM_PATH_PART = "/stream=";
128 // private static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART;
129 // private static final String STREAM_ACCESS_PATH_PART = "/access=";
130 // private static final String STREAM_LOCATION_PATH_PART = "/location";
132 // private final ListenersBroker listenersBroker;
133 // private final HandlersHolder handlersHolder;
135 // // FIXME: NETCONF:1102: do not instantiate this service
136 // new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
140 // * Initialize holder of handlers with holders as parameters.
142 // * @param dataBroker {@link DOMDataBroker}
143 // * @param notificationService {@link DOMNotificationService}
144 // * @param databindProvider a {@link DatabindProvider}
145 // * @param listenersBroker a {@link ListenersBroker}
147 // public RestconfStreamsSubscriptionServiceImpl(final DOMDataBroker dataBroker,
148 // final DOMNotificationService notificationService, final DatabindProvider databindProvider,
149 // final ListenersBroker listenersBroker) {
150 // handlersHolder = new HandlersHolder(dataBroker, notificationService, databindProvider);
151 // this.listenersBroker = requireNonNull(listenersBroker);
155 // public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
156 // final var params = QueryParams.newReceiveEventsParams(uriInfo);
158 // final URI location;
159 // if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
160 // location = listenersBroker.subscribeToDataStream(identifier, uriInfo, params, handlersHolder);
161 // } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
162 // location = listenersBroker.subscribeToYangStream(identifier, uriInfo, params, handlersHolder);
164 // final String msg = "Bad type of notification of sal-remote";
166 // throw new RestconfDocumentedException(msg);
169 // return Response.ok()
170 // .location(location)
171 // .entity(new NormalizedNodePayload(
172 // Inference.ofDataTreePath(handlersHolder.databindProvider().currentContext().modelContext(),
173 // Notifi.QNAME, LOCATION_QNAME),
174 // ImmutableNodes.leafNode(LOCATION_NODEID, location.toString())))
178 private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
180 // Prefixes for stream names
181 private static final String DATA_SUBSCRIPTION = "data-change-event-subscription";
182 private static final String NOTIFICATION_STREAM = "notification-stream";
183 private static final String DEVICE_NOTIFICATION_STREAM = "device-notification-stream";
185 private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
187 private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
188 private static final QName DATASTORE_QNAME =
189 QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
190 private static final QName SCOPE_QNAME =
191 QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
192 private static final QName OUTPUT_TYPE_QNAME =
193 QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
194 private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
195 QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
196 private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
197 QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
198 private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
199 private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
200 private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
201 private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
202 NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
203 private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
204 NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
205 private static final NodeIdentifier NOTIFICATIONS =
206 NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern());
207 private static final NodeIdentifier PATH_NODEID =
208 NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
209 private static final NodeIdentifier STREAM_NAME_NODEID =
210 NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
212 private final StampedLock dataChangeListenersLock = new StampedLock();
213 private final StampedLock notificationListenersLock = new StampedLock();
214 private final StampedLock deviceNotificationListenersLock = new StampedLock();
215 private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
216 private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
217 private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
219 private ListenersBroker() {
224 * Gets {@link ListenerAdapter} specified by stream identification.
226 * @param streamName Stream name.
227 * @return {@link ListenerAdapter} specified by stream name or {@code null} if listener with specified stream name
229 * @throws NullPointerException in {@code streamName} is {@code null}
231 public final @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
232 requireNonNull(streamName);
234 final long stamp = dataChangeListenersLock.readLock();
236 return dataChangeListeners.get(streamName);
238 dataChangeListenersLock.unlockRead(stamp);
243 * Gets {@link NotificationListenerAdapter} specified by stream name.
245 * @param streamName Stream name.
246 * @return {@link NotificationListenerAdapter} specified by stream name or {@code null} if listener with specified
247 * stream name does not exist.
248 * @throws NullPointerException in {@code streamName} is {@code null}
250 public final @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
251 requireNonNull(streamName);
253 final long stamp = notificationListenersLock.readLock();
255 return notificationListeners.get(streamName);
257 notificationListenersLock.unlockRead(stamp);
262 * Get listener for device path.
264 * @param streamName name.
265 * @return {@link DeviceNotificationListenerAdaptor} specified by stream name or {@code null} if listener with
266 * specified stream name does not exist.
267 * @throws NullPointerException in {@code path} is {@code null}
269 public final @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
270 requireNonNull(streamName);
272 final long stamp = deviceNotificationListenersLock.readLock();
274 return deviceNotificationListeners.get(streamName);
276 deviceNotificationListenersLock.unlockRead(stamp);
281 * Get listener for stream-name.
283 * @param streamName Stream name.
284 * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
285 * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
287 public final @Nullable AbstractStream<?> listenerFor(final String streamName) {
288 if (streamName.startsWith(NOTIFICATION_STREAM)) {
289 return notificationListenerFor(streamName);
290 } else if (streamName.startsWith(DATA_SUBSCRIPTION)) {
291 return dataChangeListenerFor(streamName);
292 } else if (streamName.startsWith(DEVICE_NOTIFICATION_STREAM)) {
293 return deviceNotificationListenerFor(streamName);
300 * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
301 * hasn't been created yet.
303 * @param path Path to data in data repository.
304 * @param outputType Specific type of output for notifications - XML or JSON.
305 * @return Created or existing data-change listener adapter.
307 public final ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
308 final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final Scope scope,
309 final NotificationOutputType outputType) {
310 final var sb = new StringBuilder(DATA_SUBSCRIPTION)
311 .append('/').append(createStreamNameFromUri(IdentifierCodec.serialize(path, modelContext)))
312 .append('/').append(RestconfStreamsConstants.DATASTORE_PARAM_NAME).append('=').append(datastore)
313 .append('/').append(RestconfStreamsConstants.SCOPE_PARAM_NAME).append('=').append(scope);
314 if (outputType != NotificationOutputType.XML) {
315 sb.append('/').append(outputType.getName());
318 final long stamp = dataChangeListenersLock.writeLock();
320 return dataChangeListeners.computeIfAbsent(sb.toString(),
321 streamName -> new ListenerAdapter(streamName, outputType, this, datastore, path));
323 dataChangeListenersLock.unlockWrite(stamp);
328 * Creates new {@link NotificationDefinition} listener using input stream name and schema path
329 * if such listener haven't been created yet.
331 * @param refSchemaCtx reference {@link EffectiveModelContext}
332 * @param notifications {@link QName}s of accepted YANG notifications
333 * @param outputType Specific type of output for notifications - XML or JSON.
334 * @return Created or existing notification listener adapter.
336 public final NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
337 final ImmutableSet<QName> notifications, final NotificationOutputType outputType) {
338 final var sb = new StringBuilder(NOTIFICATION_STREAM).append('/');
339 var haveFirst = false;
340 for (var qname : notifications) {
341 final var module = refSchemaCtx.findModuleStatement(qname.getModule())
342 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
343 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
344 final var stmt = module.findSchemaTreeNode(qname)
345 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
346 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
347 if (!(stmt instanceof NotificationEffectiveStatement)) {
348 throw new RestconfDocumentedException(qname + " refers to a non-notification",
349 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
357 sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
359 if (outputType != NotificationOutputType.XML) {
360 sb.append('/').append(outputType.getName());
363 final long stamp = notificationListenersLock.writeLock();
365 return notificationListeners.computeIfAbsent(sb.toString(),
366 streamName -> new NotificationListenerAdapter(streamName, outputType, this, notifications));
368 notificationListenersLock.unlockWrite(stamp);
373 * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
374 * if such listener haven't been created yet.
376 * @param deviceName Device name.
377 * @param outputType Specific type of output for notifications - XML or JSON.
378 * @param refSchemaCtx Schema context of node
379 * @param mountPointService Mount point service
380 * @return Created or existing device notification listener adapter.
382 private DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
383 final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
384 final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
385 final var sb = new StringBuilder(DEVICE_NOTIFICATION_STREAM).append('/')
388 final long stamp = deviceNotificationListenersLock.writeLock();
390 return deviceNotificationListeners.computeIfAbsent(sb.toString(),
391 streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, this, refSchemaCtx,
392 mountPointService, path));
394 deviceNotificationListenersLock.unlockWrite(stamp);
399 * Removal and closing of all data-change-event and notification listeners.
401 public final synchronized void removeAndCloseAllListeners() {
402 final long stampNotifications = notificationListenersLock.writeLock();
403 final long stampDataChanges = dataChangeListenersLock.writeLock();
405 removeAndCloseAllDataChangeListenersTemplate();
406 removeAndCloseAllNotificationListenersTemplate();
408 dataChangeListenersLock.unlockWrite(stampDataChanges);
409 notificationListenersLock.unlockWrite(stampNotifications);
414 * Closes and removes all data-change listeners.
416 public final void removeAndCloseAllDataChangeListeners() {
417 final long stamp = dataChangeListenersLock.writeLock();
419 removeAndCloseAllDataChangeListenersTemplate();
421 dataChangeListenersLock.unlockWrite(stamp);
425 @SuppressWarnings("checkstyle:IllegalCatch")
426 private void removeAndCloseAllDataChangeListenersTemplate() {
427 dataChangeListeners.values().forEach(listenerAdapter -> {
429 listenerAdapter.close();
430 } catch (Exception e) {
431 LOG.error("Failed to close data-change listener {}.", listenerAdapter, e);
432 throw new IllegalStateException("Failed to close data-change listener %s.".formatted(listenerAdapter),
436 dataChangeListeners.clear();
440 * Closes and removes all notification listeners.
442 public final void removeAndCloseAllNotificationListeners() {
443 final long stamp = notificationListenersLock.writeLock();
445 removeAndCloseAllNotificationListenersTemplate();
447 notificationListenersLock.unlockWrite(stamp);
451 @SuppressWarnings("checkstyle:IllegalCatch")
452 private void removeAndCloseAllNotificationListenersTemplate() {
453 notificationListeners.values().forEach(listenerAdapter -> {
455 listenerAdapter.close();
456 } catch (Exception e) {
457 LOG.error("Failed to close notification listener {}.", listenerAdapter, e);
458 throw new IllegalStateException("Failed to close notification listener %s.".formatted(listenerAdapter),
462 notificationListeners.clear();
466 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
468 * @param listener Listener to be closed and removed.
470 @SuppressWarnings("checkstyle:IllegalCatch")
471 public final void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
472 final long stamp = dataChangeListenersLock.writeLock();
474 removeAndCloseDataChangeListenerTemplate(listener);
475 } catch (Exception exception) {
476 LOG.error("Data-change listener {} cannot be closed.", listener, exception);
478 dataChangeListenersLock.unlockWrite(stamp);
483 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
485 * @param listener Listener to be closed and removed.
487 private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
489 requireNonNull(listener).close();
490 if (dataChangeListeners.inverse().remove(listener) == null) {
491 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
493 } catch (InterruptedException | ExecutionException e) {
494 LOG.error("Data-change listener {} cannot be closed.", listener, e);
495 throw new IllegalStateException("Data-change listener %s cannot be closed.".formatted(listener), e);
500 * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
502 * @param listener Listener to be closed and removed.
504 @SuppressWarnings("checkstyle:IllegalCatch")
505 public final void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
506 final long stamp = notificationListenersLock.writeLock();
508 removeAndCloseNotificationListenerTemplate(listener);
509 } catch (Exception e) {
510 LOG.error("Notification listener {} cannot be closed.", listener, e);
512 notificationListenersLock.unlockWrite(stamp);
517 * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
518 * specified in parameter.
520 * @param listener Listener to be closed and removed.
522 @SuppressWarnings("checkstyle:IllegalCatch")
523 public final void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
524 final long stamp = deviceNotificationListenersLock.writeLock();
526 requireNonNull(listener);
527 if (deviceNotificationListeners.inverse().remove(listener) == null) {
528 LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
530 } catch (final Exception exception) {
531 LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
533 deviceNotificationListenersLock.unlockWrite(stamp);
537 private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
539 requireNonNull(listener).close();
540 if (notificationListeners.inverse().remove(listener) == null) {
541 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
543 } catch (InterruptedException | ExecutionException e) {
544 LOG.error("Notification listener {} cannot be closed.", listener, e);
545 throw new IllegalStateException("Notification listener %s cannot be closed.".formatted(listener), e);
550 * Removal and closing of general listener (data-change or notification listener).
552 * @param stream Stream to be closed and removed from cache.
554 final void removeAndCloseListener(final AbstractStream<?> stream) {
555 requireNonNull(stream);
556 if (stream instanceof ListenerAdapter dataChange) {
557 removeAndCloseDataChangeListener(dataChange);
558 } else if (stream instanceof NotificationListenerAdapter notification) {
559 removeAndCloseNotificationListener(notification);
564 * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
565 * and optionally {@link URLConstants#BASE_PATH} prefix.
567 * @param uri URI for creation of stream name.
568 * @return String representation of stream name.
570 private static String createStreamNameFromUri(final String uri) {
571 String result = requireNonNull(uri);
573 if (result.startsWith(URLConstants.BASE_PATH)) {
574 result = result.substring(URLConstants.BASE_PATH.length());
575 } else if (result.startsWith("/")) {
576 result = result.substring(1);
581 if (result.endsWith("/")) {
582 result = result.substring(0, result.length() - 1);
588 * Prepare URL from base name and stream name.
590 * @param uriInfo base URL information
591 * @param streamName name of stream for create
594 public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
597 * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
598 * about listener to DS according to ietf-restconf-monitoring.
600 * @param identifier Name of the stream.
601 * @param uriInfo URI information.
602 * @param notificationQueryParams Query parameters of notification.
603 * @param handlersHolder Holder of handlers for notifications.
604 * @return Stream location for listening.
606 public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
607 final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
608 final String streamName = createStreamNameFromUri(identifier);
609 if (isNullOrEmpty(streamName)) {
610 throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
613 final var notificationListenerAdapter = notificationListenerFor(streamName);
614 if (notificationListenerAdapter == null) {
615 throw new RestconfDocumentedException("Stream with name %s was not found.".formatted(streamName),
616 ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
619 final URI uri = prepareUriByStreamName(uriInfo, streamName);
620 notificationListenerAdapter.setQueryParams(notificationQueryParams);
621 notificationListenerAdapter.listen(handlersHolder.notificationService());
622 final DOMDataBroker dataBroker = handlersHolder.dataBroker();
623 notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
624 final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
625 notificationListenerAdapter.qnames(), notificationListenerAdapter.getOutputType(), uri);
627 // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
628 final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
629 writeDataToDS(writeTransaction, mapToStreams);
630 submitData(writeTransaction);
635 * Register listener by streamName in identifier to listen to data change notifications, and put or delete
636 * information about listener to DS according to ietf-restconf-monitoring.
638 * @param identifier Identifier as stream name.
639 * @param uriInfo Base URI information.
640 * @param notificationQueryParams Query parameters of notification.
641 * @param handlersHolder Holder of handlers for notifications.
642 * @return Location for listening.
644 public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
645 final ReceiveEventsParams notificationQueryParams, final HandlersHolder handlersHolder) {
646 final var streamName = createStreamNameFromUri(identifier);
647 final var listener = dataChangeListenerFor(streamName);
648 if (listener == null) {
649 throw new RestconfDocumentedException("No listener found for stream " + streamName,
650 ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
653 listener.setQueryParams(notificationQueryParams);
655 final var dataBroker = handlersHolder.dataBroker();
656 final var schemaHandler = handlersHolder.databindProvider();
657 listener.setCloseVars(dataBroker, schemaHandler);
658 listener.listen(dataBroker);
660 final var uri = prepareUriByStreamName(uriInfo, streamName);
661 final var schemaContext = schemaHandler.currentContext().modelContext();
662 final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
664 final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
665 listener.getOutputType(), uri, schemaContext, serializedPath);
666 final var writeTransaction = dataBroker.newWriteOnlyTransaction();
667 writeDataToDS(writeTransaction, mapToStreams);
668 submitData(writeTransaction);
672 // FIXME: callers are utter duplicates, refactor them
673 private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
674 // FIXME: use put() here
675 tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
679 private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
681 readWriteTransaction.commit().get();
682 } catch (final InterruptedException | ExecutionException e) {
683 throw new RestconfDocumentedException("Problem while putting data to DS.", e);
689 * Create data-change-event stream with POST operation via RPC.
691 * @param input Input of RPC - example in JSON (data-change-event stream):
696 * "path": "/toaster:toaster/toaster:toasterStatus",
697 * "sal-remote-augment:datastore": "OPERATIONAL",
698 * "sal-remote-augment:scope": "ONE"
703 * @param modelContext Reference to {@link EffectiveModelContext}.
704 * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
709 * "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
715 // FIXME: this really should be a normal RPC implementation
716 public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(final ContainerNode input,
717 final EffectiveModelContext modelContext) {
718 final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
719 final var scopeName = extractStringLeaf(input, SCOPE_NODEID);
720 final var adapter = registerDataChangeListener(modelContext,
721 datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName) : LogicalDatastoreType.CONFIGURATION,
722 preparePath(input), scopeName != null ? Scope.ofName(scopeName) : Scope.BASE, prepareOutputType(input));
724 // building of output
725 return RestconfFuture.of(Optional.of(Builders.containerBuilder()
726 .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
727 .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
731 // FIXME: this really should be a normal RPC implementation
732 public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(final ContainerNode input,
733 final EffectiveModelContext modelContext) {
734 final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
735 .map(LeafSetEntryNode::body)
738 .collect(ImmutableSet.toImmutableSet());
740 for (var qname : qnames) {
741 if (modelContext.findNotification(qname).isEmpty()) {
742 throw new RestconfDocumentedException(qname + " refers to an unknown notification",
743 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
747 // registration of the listener
748 final var adapter = registerNotificationListener(modelContext, qnames, prepareOutputType(input));
750 return RestconfFuture.of(Optional.of(Builders.containerBuilder()
751 .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
752 .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
757 * Create device notification stream.
759 * @param baseUrl base Url
760 * @param input RPC input
761 * @param mountPointService dom mount point service
762 * @return {@link DOMRpcResult} - Output of RPC - example in JSON
764 // FIXME: this should be an RPC invocation
765 public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationListener(final ContainerNode input,
766 final String baseUrl, final DOMMountPointService mountPointService) {
767 // parsing out of container with settings and path
769 final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
770 .map(DataContainerChild::body)
771 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
772 ErrorTag.DATA_MISSING));
774 if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
775 throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
776 ErrorTag.INVALID_VALUE);
778 if (listId.size() != 1) {
779 throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
780 ErrorTag.INVALID_VALUE);
782 final String deviceName = listId.values().iterator().next().toString();
784 final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
785 .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
786 ErrorTag.OPERATION_FAILED));
788 final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
789 .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
790 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
792 final var mountModelContext = mountPoint.getService(DOMSchemaService.class)
793 .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
794 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
796 final var notificationPaths = mountModelContext.getModuleStatements().values().stream()
797 .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
798 .map(notification -> Absolute.of(notification.argument()))
799 .collect(ImmutableSet.toImmutableSet());
800 if (notificationPaths.isEmpty()) {
801 throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
802 ErrorTag.OPERATION_FAILED);
805 final var notificationListenerAdapter = registerDeviceNotificationListener(deviceName,
806 prepareOutputType(input), mountModelContext, mountPointService, mountPoint.getIdentifier());
807 notificationListenerAdapter.listen(mountNotifService, notificationPaths);
809 return RestconfFuture.of(Optional.of(Builders.containerBuilder()
810 .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
811 .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
812 baseUrl + notificationListenerAdapter.getStreamName()))
817 * Prepare {@link NotificationOutputType}.
819 * @param data Container with stream settings (RPC create-stream).
820 * @return Parsed {@link NotificationOutputType}.
822 private static NotificationOutputType prepareOutputType(final ContainerNode data) {
823 final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
824 return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
828 * Prepare {@link YangInstanceIdentifier} of stream source.
830 * @param data Container with stream settings (RPC create-stream).
831 * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
832 * are going to be generated.
834 private static YangInstanceIdentifier preparePath(final ContainerNode data) {
835 final var pathLeaf = data.childByArg(PATH_NODEID);
836 if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
840 throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
841 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
844 private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
845 return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str