2 * Copyright © 2019 FRINX s.r.o. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static com.google.common.base.Strings.isNullOrEmpty;
11 import static java.util.Objects.requireNonNull;
13 import com.google.common.collect.BiMap;
14 import com.google.common.collect.HashBiMap;
15 import com.google.common.collect.ImmutableSet;
17 import java.util.Optional;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.locks.StampedLock;
20 import javax.ws.rs.core.UriInfo;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.eclipse.jdt.annotation.Nullable;
23 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
24 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
28 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
29 import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
30 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
31 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
32 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
33 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
34 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
35 import org.opendaylight.yangtools.yang.common.ErrorTag;
36 import org.opendaylight.yangtools.yang.common.ErrorType;
37 import org.opendaylight.yangtools.yang.common.QName;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
40 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
41 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
42 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
47 * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
48 * {@link NotificationListenerAdapter} listeners.
50 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
51 // names. We essentially need a component which deals with allocation of stream names and their lifecycle and
52 // the contents of /restconf-state/streams.
53 public abstract sealed class ListenersBroker {
55 * A ListenersBroker working with Server-Sent Events.
57 public static final class ServerSentEvents extends ListenersBroker {
59 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
60 return uriInfo.getBaseUriBuilder()
61 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
67 * A ListenersBroker working with WebSockets.
69 public static final class WebSockets extends ListenersBroker {
71 public URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
72 final var scheme = switch (uriInfo.getAbsolutePath().getScheme()) {
73 // Secured HTTP goes to Secured WebSockets
74 case "https" -> "wss";
75 // Unsecured HTTP and others go to unsecured WebSockets
79 return uriInfo.getBaseUriBuilder()
81 .replacePath(URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH + '/' + streamName)
86 private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
88 private final StampedLock dataChangeListenersLock = new StampedLock();
89 private final StampedLock notificationListenersLock = new StampedLock();
90 private final StampedLock deviceNotificationListenersLock = new StampedLock();
91 private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
92 private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
93 private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
95 private ListenersBroker() {
100 * Gets {@link ListenerAdapter} specified by stream identification.
102 * @param streamName Stream name.
103 * @return {@link ListenerAdapter} specified by stream name or {@code null} if listener with specified stream name
105 * @throws NullPointerException in {@code streamName} is {@code null}
107 public final @Nullable ListenerAdapter dataChangeListenerFor(final String streamName) {
108 requireNonNull(streamName);
110 final long stamp = dataChangeListenersLock.readLock();
112 return dataChangeListeners.get(streamName);
114 dataChangeListenersLock.unlockRead(stamp);
119 * Gets {@link NotificationListenerAdapter} specified by stream name.
121 * @param streamName Stream name.
122 * @return {@link NotificationListenerAdapter} specified by stream name or {@code null} if listener with specified
123 * stream name does not exist.
124 * @throws NullPointerException in {@code streamName} is {@code null}
126 public final @Nullable NotificationListenerAdapter notificationListenerFor(final String streamName) {
127 requireNonNull(streamName);
129 final long stamp = notificationListenersLock.readLock();
131 return notificationListeners.get(streamName);
133 notificationListenersLock.unlockRead(stamp);
138 * Get listener for device path.
140 * @param streamName name.
141 * @return {@link DeviceNotificationListenerAdaptor} specified by stream name or {@code null} if listener with
142 * specified stream name does not exist.
143 * @throws NullPointerException in {@code path} is {@code null}
145 public final @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
146 requireNonNull(streamName);
148 final long stamp = deviceNotificationListenersLock.readLock();
150 return deviceNotificationListeners.get(streamName);
152 deviceNotificationListenersLock.unlockRead(stamp);
157 * Get listener for stream-name.
159 * @param streamName Stream name.
160 * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
161 * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
163 public final @Nullable BaseListenerInterface listenerFor(final String streamName) {
164 if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
165 return notificationListenerFor(streamName);
166 } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
167 return dataChangeListenerFor(streamName);
168 } else if (streamName.startsWith(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM)) {
169 return deviceNotificationListenerFor(streamName);
176 * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
177 * hasn't been created yet.
179 * @param path Path to data in data repository.
180 * @param outputType Specific type of output for notifications - XML or JSON.
181 * @return Created or existing data-change listener adapter.
183 public final ListenerAdapter registerDataChangeListener(final EffectiveModelContext modelContext,
184 final LogicalDatastoreType datastore, final YangInstanceIdentifier path, final Scope scope,
185 final NotificationOutputType outputType) {
186 final var sb = new StringBuilder(RestconfStreamsConstants.DATA_SUBSCRIPTION)
187 .append('/').append(createStreamNameFromUri(IdentifierCodec.serialize(path, modelContext)))
188 .append('/').append(RestconfStreamsConstants.DATASTORE_PARAM_NAME).append('=').append(datastore)
189 .append('/').append(RestconfStreamsConstants.SCOPE_PARAM_NAME).append('=').append(scope);
190 if (outputType != NotificationOutputType.XML) {
191 sb.append('/').append(outputType.getName());
194 final long stamp = dataChangeListenersLock.writeLock();
196 return dataChangeListeners.computeIfAbsent(sb.toString(),
197 streamName -> new ListenerAdapter(datastore, path, streamName, outputType, this));
199 dataChangeListenersLock.unlockWrite(stamp);
204 * Creates new {@link NotificationDefinition} listener using input stream name and schema path
205 * if such listener haven't been created yet.
207 * @param refSchemaCtx reference {@link EffectiveModelContext}
208 * @param notifications {@link QName}s of accepted YANG notifications
209 * @param outputType Specific type of output for notifications - XML or JSON.
210 * @return Created or existing notification listener adapter.
212 public final NotificationListenerAdapter registerNotificationListener(final EffectiveModelContext refSchemaCtx,
213 final ImmutableSet<QName> notifications, final NotificationOutputType outputType) {
214 final var sb = new StringBuilder(RestconfStreamsConstants.NOTIFICATION_STREAM).append('/');
215 var haveFirst = false;
216 for (var qname : notifications) {
217 final var module = refSchemaCtx.findModuleStatement(qname.getModule())
218 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
219 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
220 final var stmt = module.findSchemaTreeNode(qname)
221 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
222 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
223 if (!(stmt instanceof NotificationEffectiveStatement)) {
224 throw new RestconfDocumentedException(qname + " refers to a non-notification",
225 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
233 sb.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
235 if (outputType != NotificationOutputType.XML) {
236 sb.append('/').append(outputType.getName());
239 final long stamp = notificationListenersLock.writeLock();
241 return notificationListeners.computeIfAbsent(sb.toString(),
242 streamName -> new NotificationListenerAdapter(notifications, streamName, outputType, this));
244 notificationListenersLock.unlockWrite(stamp);
249 * Creates new {@link DeviceNotificationListenerAdaptor} listener using input stream name and schema path
250 * if such listener haven't been created yet.
252 * @param deviceName Device name.
253 * @param outputType Specific type of output for notifications - XML or JSON.
254 * @param refSchemaCtx Schema context of node
255 * @param mountPointService Mount point service
256 * @return Created or existing device notification listener adapter.
258 public final DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
259 final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
260 final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
261 final var sb = new StringBuilder(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM).append('/')
264 final long stamp = deviceNotificationListenersLock.writeLock();
266 return deviceNotificationListeners.computeIfAbsent(sb.toString(),
267 streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
268 mountPointService, path, this));
270 deviceNotificationListenersLock.unlockWrite(stamp);
275 * Removal and closing of all data-change-event and notification listeners.
277 public final synchronized void removeAndCloseAllListeners() {
278 final long stampNotifications = notificationListenersLock.writeLock();
279 final long stampDataChanges = dataChangeListenersLock.writeLock();
281 removeAndCloseAllDataChangeListenersTemplate();
282 removeAndCloseAllNotificationListenersTemplate();
284 dataChangeListenersLock.unlockWrite(stampDataChanges);
285 notificationListenersLock.unlockWrite(stampNotifications);
290 * Closes and removes all data-change listeners.
292 public final void removeAndCloseAllDataChangeListeners() {
293 final long stamp = dataChangeListenersLock.writeLock();
295 removeAndCloseAllDataChangeListenersTemplate();
297 dataChangeListenersLock.unlockWrite(stamp);
301 @SuppressWarnings("checkstyle:IllegalCatch")
302 private void removeAndCloseAllDataChangeListenersTemplate() {
303 dataChangeListeners.values().forEach(listenerAdapter -> {
305 listenerAdapter.close();
306 } catch (Exception e) {
307 LOG.error("Failed to close data-change listener {}.", listenerAdapter, e);
308 throw new IllegalStateException("Failed to close data-change listener %s.".formatted(listenerAdapter),
312 dataChangeListeners.clear();
316 * Closes and removes all notification listeners.
318 public final void removeAndCloseAllNotificationListeners() {
319 final long stamp = notificationListenersLock.writeLock();
321 removeAndCloseAllNotificationListenersTemplate();
323 notificationListenersLock.unlockWrite(stamp);
327 @SuppressWarnings("checkstyle:IllegalCatch")
328 private void removeAndCloseAllNotificationListenersTemplate() {
329 notificationListeners.values().forEach(listenerAdapter -> {
331 listenerAdapter.close();
332 } catch (Exception e) {
333 LOG.error("Failed to close notification listener {}.", listenerAdapter, e);
334 throw new IllegalStateException("Failed to close notification listener %s.".formatted(listenerAdapter),
338 notificationListeners.clear();
342 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
344 * @param listener Listener to be closed and removed.
346 @SuppressWarnings("checkstyle:IllegalCatch")
347 public final void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
348 final long stamp = dataChangeListenersLock.writeLock();
350 removeAndCloseDataChangeListenerTemplate(listener);
351 } catch (Exception exception) {
352 LOG.error("Data-change listener {} cannot be closed.", listener, exception);
354 dataChangeListenersLock.unlockWrite(stamp);
359 * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
361 * @param listener Listener to be closed and removed.
363 private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
365 requireNonNull(listener).close();
366 if (dataChangeListeners.inverse().remove(listener) == null) {
367 LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
369 } catch (InterruptedException | ExecutionException e) {
370 LOG.error("Data-change listener {} cannot be closed.", listener, e);
371 throw new IllegalStateException("Data-change listener %s cannot be closed.".formatted(listener), e);
376 * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
378 * @param listener Listener to be closed and removed.
380 @SuppressWarnings("checkstyle:IllegalCatch")
381 public final void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
382 final long stamp = notificationListenersLock.writeLock();
384 removeAndCloseNotificationListenerTemplate(listener);
385 } catch (Exception e) {
386 LOG.error("Notification listener {} cannot be closed.", listener, e);
388 notificationListenersLock.unlockWrite(stamp);
393 * Removes and closes device notification listener of type {@link NotificationListenerAdapter}
394 * specified in parameter.
396 * @param listener Listener to be closed and removed.
398 @SuppressWarnings("checkstyle:IllegalCatch")
399 public final void removeAndCloseDeviceNotificationListener(final DeviceNotificationListenerAdaptor listener) {
400 final long stamp = deviceNotificationListenersLock.writeLock();
402 requireNonNull(listener);
403 if (deviceNotificationListeners.inverse().remove(listener) == null) {
404 LOG.warn("There isn't any device notification stream that would match listener adapter {}.", listener);
406 } catch (final Exception exception) {
407 LOG.error("Device Notification listener {} cannot be closed.", listener, exception);
409 deviceNotificationListenersLock.unlockWrite(stamp);
413 private void removeAndCloseNotificationListenerTemplate(final NotificationListenerAdapter listener) {
415 requireNonNull(listener).close();
416 if (notificationListeners.inverse().remove(listener) == null) {
417 LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
419 } catch (InterruptedException | ExecutionException e) {
420 LOG.error("Notification listener {} cannot be closed.", listener, e);
421 throw new IllegalStateException("Notification listener %s cannot be closed.".formatted(listener), e);
426 * Removal and closing of general listener (data-change or notification listener).
428 * @param listener Listener to be closed and removed from cache.
430 final void removeAndCloseListener(final BaseListenerInterface listener) {
431 requireNonNull(listener);
432 if (listener instanceof ListenerAdapter) {
433 removeAndCloseDataChangeListener((ListenerAdapter) listener);
434 } else if (listener instanceof NotificationListenerAdapter) {
435 removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
440 * Creates string representation of stream name from URI. Removes slash from URI in start and end positions,
441 * and optionally {@link URLConstants#BASE_PATH} prefix.
443 * @param uri URI for creation of stream name.
444 * @return String representation of stream name.
446 private static String createStreamNameFromUri(final String uri) {
447 String result = requireNonNull(uri);
449 if (result.startsWith(URLConstants.BASE_PATH)) {
450 result = result.substring(URLConstants.BASE_PATH.length());
451 } else if (result.startsWith("/")) {
452 result = result.substring(1);
457 if (result.endsWith("/")) {
458 result = result.substring(0, result.length() - 1);
464 * Prepare URL from base name and stream name.
466 * @param uriInfo base URL information
467 * @param streamName name of stream for create
470 public abstract @NonNull URI prepareUriByStreamName(UriInfo uriInfo, String streamName);
473 * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
474 * about listener to DS according to ietf-restconf-monitoring.
476 * @param identifier Name of the stream.
477 * @param uriInfo URI information.
478 * @param notificationQueryParams Query parameters of notification.
479 * @param handlersHolder Holder of handlers for notifications.
480 * @return Stream location for listening.
482 public final @NonNull URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
483 final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
484 final String streamName = createStreamNameFromUri(identifier);
485 if (isNullOrEmpty(streamName)) {
486 throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
489 final var notificationListenerAdapter = notificationListenerFor(streamName);
490 if (notificationListenerAdapter == null) {
491 throw new RestconfDocumentedException("Stream with name %s was not found.".formatted(streamName),
492 ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT);
495 final URI uri = prepareUriByStreamName(uriInfo, streamName);
496 notificationListenerAdapter.setQueryParams(notificationQueryParams);
497 notificationListenerAdapter.listen(handlersHolder.notificationService());
498 final DOMDataBroker dataBroker = handlersHolder.dataBroker();
499 notificationListenerAdapter.setCloseVars(dataBroker, handlersHolder.databindProvider());
500 final MapEntryNode mapToStreams = RestconfStateStreams.notificationStreamEntry(streamName,
501 notificationListenerAdapter.qnames(), notificationListenerAdapter.getStart(),
502 notificationListenerAdapter.getOutputType(), uri);
504 // FIXME: how does this correlate with the transaction notificationListenerAdapter.close() will do?
505 final DOMDataTreeWriteTransaction writeTransaction = dataBroker.newWriteOnlyTransaction();
506 writeDataToDS(writeTransaction, mapToStreams);
507 submitData(writeTransaction);
512 * Register listener by streamName in identifier to listen to data change notifications, and put or delete
513 * information about listener to DS according to ietf-restconf-monitoring.
515 * @param identifier Identifier as stream name.
516 * @param uriInfo Base URI information.
517 * @param notificationQueryParams Query parameters of notification.
518 * @param handlersHolder Holder of handlers for notifications.
519 * @return Location for listening.
521 public final URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
522 final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
523 final var streamName = createStreamNameFromUri(identifier);
524 final var listener = dataChangeListenerFor(streamName);
525 if (listener == null) {
526 throw new RestconfDocumentedException("No listener found for stream " + streamName,
527 ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
530 listener.setQueryParams(notificationQueryParams);
532 final var dataBroker = handlersHolder.dataBroker();
533 final var schemaHandler = handlersHolder.databindProvider();
534 listener.setCloseVars(dataBroker, schemaHandler);
535 listener.listen(dataBroker);
537 final var uri = prepareUriByStreamName(uriInfo, streamName);
538 final var schemaContext = schemaHandler.currentContext().modelContext();
539 final var serializedPath = IdentifierCodec.serialize(listener.getPath(), schemaContext);
541 final var mapToStreams = RestconfStateStreams.dataChangeStreamEntry(listener.getPath(),
542 listener.getStart(), listener.getOutputType(), uri, schemaContext, serializedPath);
543 final var writeTransaction = dataBroker.newWriteOnlyTransaction();
544 writeDataToDS(writeTransaction, mapToStreams);
545 submitData(writeTransaction);
549 // FIXME: callers are utter duplicates, refactor them
550 private static void writeDataToDS(final DOMDataTreeWriteOperations tx, final MapEntryNode mapToStreams) {
551 // FIXME: use put() here
552 tx.merge(LogicalDatastoreType.OPERATIONAL, RestconfStateStreams.restconfStateStreamPath(mapToStreams.name()),
556 private static void submitData(final DOMDataTreeWriteTransaction readWriteTransaction) {
558 readWriteTransaction.commit().get();
559 } catch (final InterruptedException | ExecutionException e) {
560 throw new RestconfDocumentedException("Problem while putting data to DS.", e);