2 * Copyright © 2019 FRINX s.r.o. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static com.google.common.base.Strings.isNullOrEmpty;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.collect.BiMap;
14 import com.google.common.collect.HashBiMap;
15 import com.google.common.collect.ImmutableSet;
17 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.locks.StampedLock;
20 import javax.ws.rs.core.UriInfo;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
24 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
28 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
29 import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
30 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
31 import org.opendaylight.restconf.nb.rfc8040.monitoring.RestconfStateStreams;
32 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
33 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
34 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
35 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
36 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
37 import org.opendaylight.yangtools.yang.common.ErrorTag;
38 import org.opendaylight.yangtools.yang.common.ErrorType;
39 import org.opendaylight.yangtools.yang.common.QName;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
41 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
42 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
43 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
44 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
49 * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
50 * {@link NotificationListenerAdapter} listeners.
52 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
53 // names. We essentially need a component which deals with allocation of stream names and their lifecycle and
54 // the contents of /restconf-state/streams.
55 public abstract sealed class ListenersBroker {
57 * A ListenersBroker working with Server-Sent Events.
59 public static final class ServerSentEvents extends ListenersBroker {
61 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
62 return uriInfo.getBaseUriBuilder()
63 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.SSE_SUBPATH + '/' + streamName)
69 * A ListenersBroker working with WebSockets.
71 public static final class WebSockets extends ListenersBroker {
73 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
74 final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
75 // Secured HTTP goes to Secured WebSockets
76 case "https" -> "wss";
77 // Unsecured HTTP and others go to unsecured WebSockets
81 return uriInfo.getBaseUriBuilder()
83 .replacePath(URLConstants.BASE_PATH + '/' + streamName)
88 private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
90 private final StampedLock dataChangeListenersLock = new StampedLock();
91 private final StampedLock notificationListenersLock = new StampedLock();
92 private final StampedLock deviceNotificationListenersLock = new StampedLock();
93 private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
94 private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
95 private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
97 private ListenersBroker() {
102 * Gets {@link ListenerAdapter} specified by stream identification.
104 * @param streamName Stream name.
105 * @return {@link ListenerAdapter} specified by stream name or {@code null} if listener with specified stream name
107 * @throws NullPointerException in {@code streamName} is {@code null}
109 public final @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
110 requireNonNull(streamName);
112 final long stamp = dataChangeListenersLock.readLock();
114 return dataChangeListeners.get(streamName);
116 dataChangeListenersLock.unlockRead(stamp);
121 * Gets {@link NotificationListenerAdapter} specified by stream name.
123 * @param streamName Stream name.
124 * @return {@link NotificationListenerAdapter} specified by stream name or {@code null} if listener with specified
125 * stream name does not exist.
126 * @throws NullPointerException in {@code streamName} is {@code null}
128 public final @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
129 requireNonNull(streamName);
131 final long stamp = notificationListenersLock.readLock();
133 return notificationListeners.get(streamName);
135 notificationListenersLock.unlockRead(stamp);
140 * Get listener for device path.
142 * @param streamName name.
143 * @return {@link DeviceNotificationListenerAdaptor} specified by stream name or {@code null} if listener with
144 * specified stream name does not exist.
145 * @throws NullPointerException in {@code path} is {@code null}
147 public final @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
148 requireNonNull(streamName);
150 final long stamp = deviceNotificationListenersLock.readLock();
152 return deviceNotificationListeners.get(streamName);
154 deviceNotificationListenersLock.unlockRead(stamp);
159 * Get listener for stream-name.
161 * @param streamName Stream name.
162 * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
163 * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
165 public final @Nullable BaseListenerInterface listenerFor(final String streamName) {
166 if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
167 return notificationListenerFor(streamName);
168 } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
169 return dataChangeListenerFor(streamName);
170 } else if (streamName.startsWith(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM)) {
171 return deviceNotificationListenerFor(streamName);
178 * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
179 * hasn't been created yet.
181 * @param path Path to data in data repository.
182 * @param outputType Specific type of output for notifications - XML or JSON.
183 * @return Created or existing data-change listener adapter.
185 public final ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
186 final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final Scope scope,
187 final NotificationOutputType outputType) {
188 final var sb = new StringBuilder(RestconfStreamsConstants.DATA_SUBSCRIPTION)
189 .append('/').append(createStreamNameFromUri(IdentifierCodec.serialize(path, modelContext)))
190 .append('/').append(RestconfStreamsConstants.DATASTORE_PARAM_NAME).append('=').append(datastore)
191 .append('/').append(RestconfStreamsConstants.SCOPE_PARAM_NAME).append('=').append(scope);
192 if (outputType != NotificationOutputType.XML) {
193 sb.append('/').append(outputType.getName());
196 final long stamp = dataChangeListenersLock.writeLock();
198 return dataChangeListeners.computeIfAbsent(sb.toString(),
199 streamName -> new ListenerAdapter(datastore, path, streamName, outputType, this));
201 dataChangeListenersLock.unlockWrite(stamp);
206 * Creates new {@link NotificationDefinition} listener using input stream name and schema path
207 * if such listener haven't been created yet.
209 * @param refSchemaCtx reference {@link EffectiveModelContext}
210 * @param notifications {@link QName}s of accepted YANG notifications
211 * @param outputType Specific type of output for notifications - XML or JSON.
212 * @return Created or existing notification listener adapter.
214 public final NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
215 final ImmutableSet<QName> notifications, final NotificationOutputType outputType) {
216 final var sb = new StringBuilder(RestconfStreamsConstants.NOTIFICATION_STREAM).append('/');
217 var haveFirst = false;
218 for (var qname : notifications) {
219 final var module = refSchemaCtx.findModuleStatement(qname.getModule())
220 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
221 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
222 final var stmt = module.findSchemaTreeNode(qname)
223 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
224 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
225 if (!(stmt instanceof NotificationEffectiveStatement)) {
226 throw new RestconfDocumentedException(qname + " refers to a non-notification",
227 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
235 sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
237 if (outputType != NotificationOutputType.XML) {
238 sb.append('/').append(outputType.getName());
241 final long stamp = notificationListenersLock.writeLock();
243 return notificationListeners.computeIfAbsent(sb.toString(),
244 streamName -> new NotificationListenerAdapter(notifications, streamName, outputType, this));
246 notificationListenersLock.unlockWrite(stamp);
251 * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
252 * if such listener haven't been created yet.
254 * @param deviceName Device name.
255 * @param outputType Specific type of output for notifications - XML or JSON.
256 * @param refSchemaCtx Schema context of node
257 * @param mountPointService Mount point service
258 * @return Created or existing device notification listener adapter.
260 public final DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
261 final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
262 final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
263 final var sb = new StringBuilder(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM).append('/')
266 final long stamp = deviceNotificationListenersLock.writeLock();
268 return deviceNotificationListeners.computeIfAbsent(sb.toString(),
269 streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
270 mountPointService, path, this));
272 deviceNotificationListenersLock.unlockWrite(stamp);
277 * Removal and closing of all data-change-event and notification listeners.
279 public final synchronized void removeAndCloseAllListeners() {
280 final long stampNotifications = notificationListenersLock.writeLock();
281 final long stampDataChanges = dataChangeListenersLock.writeLock();
283 removeAndCloseAllDataChangeListenersTemplate();
284 removeAndCloseAllNotificationListenersTemplate();
286 dataChangeListenersLock.unlockWrite(stampDataChanges);
287 notificationListenersLock.unlockWrite(stampNotifications);
292 * Closes and removes all data-change listeners.
294 public final void removeAndCloseAllDataChangeListeners() {
295 final long stamp = dataChangeListenersLock.writeLock();
297 removeAndCloseAllDataChangeListenersTemplate();
299 dataChangeListenersLock.unlockWrite(stamp);
303 @SuppressWarnings("checkstyle:IllegalCatch")
304 private void removeAndCloseAllDataChangeListenersTemplate() {
305 dataChangeListeners.values().forEach(listenerAdapter -> {
307 listenerAdapter.close();
308 } catch (Exception e) {
309 LOG.error("Failed to close data-change listener {}.", listenerAdapter, e);
310 throw new IllegalStateException("Failed to close data-change listener %s.".formatted(listenerAdapter),
314 dataChangeListeners.clear();
318 * Closes and removes all notification listeners.
320 public final void removeAndCloseAllNotificationListeners() {
321 final long stamp = notificationListenersLock.writeLock();
323 removeAndCloseAllNotificationListenersTemplate();
325 notificationListenersLock.unlockWrite(stamp);
329 @SuppressWarnings("checkstyle:IllegalCatch")
330 private void removeAndCloseAllNotificationListenersTemplate() {
331 notificationListeners.values().forEach(listenerAdapter -> {
333 listenerAdapter.close();
334 } catch (Exception e) {
335 LOG.error("Failed to close notification listener {}.", listenerAdapter, e);
336 throw new IllegalStateException("Failed to close notification listener %s.".formatted(listenerAdapter),
340 notificationListeners.clear();
344 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
346 * @param listener Listener to be closed and removed.
348 @SuppressWarnings("checkstyle:IllegalCatch")
349 public final void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
350 final long stamp = dataChangeListenersLock.writeLock();
352 removeAndCloseDataChangeListenerTemplate(listener);
353 } catch (Exception exception) {
354 LOG.error("Data-change listener {} cannot be closed.", listener, exception);
356 dataChangeListenersLock.unlockWrite(stamp);
361 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
363 * @param listener Listener to be closed and removed.
365 private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
367 requireNonNull(listener).close();
368 if (dataChangeListeners.inverse().remove(listener) == null) {
369 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
371 } catch (InterruptedException | ExecutionException e) {
372 LOG.error("Data-change listener {} cannot be closed.", listener, e);
373 throw new IllegalStateException("Data-change listener %s cannot be closed.".formatted(listener), e);
378 * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
380 * @param listener Listener to be closed and removed.
382 @SuppressWarnings("checkstyle:IllegalCatch")
383 public final void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
384 final long stamp = notificationListenersLock.writeLock();
386 removeAndCloseNotificationListenerTemplate(listener);
387 } catch (Exception e) {
388 LOG.error("Notification listener {} cannot be closed.", listener, e);
390 notificationListenersLock.unlockWrite(stamp);
395 * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
396 * specified in parameter.
398 * @param listener Listener to be closed and removed.
400 @SuppressWarnings("checkstyle:IllegalCatch")
401 public final void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
402 final long stamp = deviceNotificationListenersLock.writeLock();
404 requireNonNull(listener);
405 if (deviceNotificationListeners.inverse().remove(listener) == null) {
406 LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
408 } catch (final Exception exception) {
409 LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
411 deviceNotificationListenersLock.unlockWrite(stamp);
415 private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
417 requireNonNull(listener).close();
418 if (notificationListeners.inverse().remove(listener) == null) {
419 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
421 } catch (InterruptedException | ExecutionException e) {
422 LOG.error("Notification listener {} cannot be closed.", listener, e);
423 throw new IllegalStateException("Notification listener %s cannot be closed.".formatted(listener), e);
428 * Removal and closing of general listener (data-change or notification listener).
430 * @param listener Listener to be closed and removed from cache.
432 final void removeAndCloseListener(final BaseListenerInterface listener) {
433 requireNonNull(listener);
434 if (listener instanceof ListenerAdapter) {
435 removeAndCloseDataChangeListener((ListenerAdapter) listener);
436 } else if (listener instanceof NotificationListenerAdapter) {
437 removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
442 * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
443 * and optionally {@link URLConstants#BASE_PATH} prefix.
445 * @param uri URI for creation of stream name.
446 * @return String representation of stream name.
448 public static String createStreamNameFromUri(final String uri) {
449 String result = requireNonNull(uri);
451 if (result.startsWith(URLConstants.BASE_PATH)) {
452 result = result.substring(URLConstants.BASE_PATH.length());
453 } else if (result.startsWith("/")) {
454 result = result.substring(1);
459 if (result.endsWith("/")) {
460 result = result.substring(0, result.length() - 1);
466 * Prepare URL from base name and stream name.
468 * @param uriInfo base URL information
469 * @param streamName name of stream for create
472 public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
475 * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
476 * about listener to DS according to ietf-restconf-monitoring.
478 * @param identifier Name of the stream.
479 * @param uriInfo URI information.
480 * @param notificationQueryParams Query parameters of notification.
481 * @param handlersHolder Holder of handlers for notifications.
482 * @return Stream location for listening.
484 public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
485 final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
486 final String streamName = createStreamNameFromUri(identifier);
487 if (isNullOrEmpty(streamName)) {
488 throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
491 final var notificationListenerAdapter = notificationListenerFor(streamName);
492 if (notificationListenerAdapter == null) {
493 throw new RestconfDocumentedException("Stream with name %s was not found.".formatted(streamName),
494 ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
497 final URI uri = prepareUriByStreamName(uriInfo, streamName);
498 notificationListenerAdapter.setQueryParams(notificationQueryParams);
499 notificationListenerAdapter.listen(handlersHolder.notificationService());
500 final DOMDataBroker dataBroker = handlersHolder.dataBroker();
501 notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
502 final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
503 notificationListenerAdapter.qnames(), notificationListenerAdapter.getStart(),
504 notificationListenerAdapter.getOutputType(), uri);
506 // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
507 final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
508 writeDataToDS(writeTransaction, mapToStreams);
509 submitData(writeTransaction);
514 * Register listener by streamName in identifier to listen to data change notifications, and put or delete
515 * information about listener to DS according to ietf-restconf-monitoring.
517 * @param identifier Identifier as stream name.
518 * @param uriInfo Base URI information.
519 * @param notificationQueryParams Query parameters of notification.
520 * @param handlersHolder Holder of handlers for notifications.
521 * @return Location for listening.
523 public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
524 final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
525 final var streamName = createStreamNameFromUri(identifier);
526 final var listener = dataChangeListenerFor(streamName);
527 if (listener == null) {
528 throw new RestconfDocumentedException("No listener found for stream " + streamName,
529 ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
532 listener.setQueryParams(notificationQueryParams);
534 final var dataBroker = handlersHolder.dataBroker();
535 final var schemaHandler = handlersHolder.databindProvider();
536 listener.setCloseVars(dataBroker, schemaHandler);
537 listener.listen(dataBroker);
539 final var uri = prepareUriByStreamName(uriInfo, streamName);
540 final var schemaContext = schemaHandler.currentContext().modelContext();
541 final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
543 final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
544 listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
545 final var writeTransaction = dataBroker.newWriteOnlyTransaction();
546 writeDataToDS(writeTransaction, mapToStreams);
547 submitData(writeTransaction);
551 // FIXME: callers are utter duplicates, refactor them
552 private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
553 // FIXME: use put() here
554 tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
558 private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
560 readWriteTransaction.commit().get();
561 } catch (final InterruptedException | ExecutionException e) {
562 throw new RestconfDocumentedException("Problem while putting data to DS.", e);