Expose streams with all supported encodings
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / ListenersBroker.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.ImmutableSet;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.net.URI;
17 import java.net.URISyntaxException;
18 import java.util.Optional;
19 import java.util.Set;
20 import java.util.UUID;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.mdsal.common.api.CommitInfo;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
28 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
29 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
30 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
31 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
32 import org.opendaylight.restconf.common.errors.RestconfFuture;
33 import org.opendaylight.restconf.common.errors.SettableRestconfFuture;
34 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
35 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
36 import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
37 import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Source;
38 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
39 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.RestconfState;
40 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.Streams;
41 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.Stream;
42 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
48 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.CreateDataChangeEventSubscriptionInput1;
49 import org.opendaylight.yangtools.yang.common.ErrorTag;
50 import org.opendaylight.yangtools.yang.common.ErrorType;
51 import org.opendaylight.yangtools.yang.common.QName;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
55 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
56 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
57 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
60 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
61 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
62 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
63 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
64 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 /**
69  * This singleton class is responsible for creation, removal and searching for {@link DataTreeChangeSource} or
70  * {@link NotificationSource} listeners.
71  */
72 // FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
73 //        names. We essentially need a component which deals with allocation of stream names and their lifecycle and
74 //        the contents of /restconf-state/streams.
75 public abstract sealed class ListenersBroker {
76     /**
77      * A ListenersBroker working with Server-Sent Events.
78      */
79     public static final class ServerSentEvents extends ListenersBroker {
80         public ServerSentEvents(final DOMDataBroker dataBroker, final DOMNotificationService notificationService,
81                 final DOMMountPointService mountPointService) {
82             super(dataBroker, notificationService, mountPointService);
83         }
84     }
85
86     /**
87      * A ListenersBroker working with WebSockets.
88      */
89     public static final class WebSockets extends ListenersBroker {
90         public WebSockets(final DOMDataBroker dataBroker, final DOMNotificationService notificationService,
91                 final DOMMountPointService mountPointService) {
92             super(dataBroker, notificationService, mountPointService);
93         }
94
95         @Override
96         String streamsScheme(final URI baseURI) {
97             return switch (super.streamsScheme(baseURI)) {
98                 // Secured HTTP goes to Secured WebSockets
99                 case "https" -> "wss";
100                 // Unsecured HTTP and others go to unsecured WebSockets
101                 default -> "ws";
102             };
103         }
104     }
105
106     private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
107     private static final YangInstanceIdentifier RESTCONF_STATE_STREAMS = YangInstanceIdentifier.of(
108         NodeIdentifier.create(RestconfState.QNAME),
109         NodeIdentifier.create(Streams.QNAME),
110         NodeIdentifier.create(Stream.QNAME));
111
112     @VisibleForTesting
113     static final QName NAME_QNAME =  QName.create(Stream.QNAME, "name").intern();
114     @VisibleForTesting
115     static final QName DESCRIPTION_QNAME = QName.create(Stream.QNAME, "description").intern();
116     @VisibleForTesting
117     static final QName ENCODING_QNAME =  QName.create(Stream.QNAME, "encoding").intern();
118     @VisibleForTesting
119     static final QName LOCATION_QNAME =  QName.create(Stream.QNAME, "location").intern();
120
121     private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(
122         QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern());
123     private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
124         NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern());
125     private static final NodeIdentifier DEVICE_NOTIFICATION_STREAM_PATH_NODEID =
126         NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "stream-path").intern());
127
128     private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
129         NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
130     private static final NodeIdentifier NOTIFICATIONS =
131         NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern());
132     private static final NodeIdentifier PATH_NODEID =
133         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
134     private static final NodeIdentifier STREAM_NAME_NODEID =
135         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
136
137     private final ConcurrentMap<String, RestconfStream<?>> streams = new ConcurrentHashMap<>();
138     private final DOMDataBroker dataBroker;
139     @Deprecated(forRemoval = true)
140     private final DOMMountPointService mountPointService;
141     @Deprecated(forRemoval = true)
142     private final DOMNotificationService notificationService;
143
144     private ListenersBroker(final DOMDataBroker dataBroker, final DOMNotificationService notificationService,
145             final DOMMountPointService mountPointService) {
146         this.dataBroker = requireNonNull(dataBroker);
147         this.notificationService = requireNonNull(notificationService);
148         this.mountPointService = requireNonNull(mountPointService);
149     }
150
151     /**
152      * Get a {@link RestconfStream} by its name.
153      *
154      * @param streamName Stream name.
155      * @return A {@link RestconfStream}, or {@code null} if the stream with specified name does not exist.
156      * @throws NullPointerException if {@code streamName} is {@code null}
157      */
158     public final @Nullable RestconfStream<?> getStream(final String streamName) {
159         return streams.get(streamName);
160     }
161
162     /**
163      * Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name, create
164      * the corresponding instance and register it.
165      *
166      * @param <T> Stream type
167      * @param baseStreamLocation base streams location
168      * @param factory Factory for creating the actual stream instance
169      * @return A {@link RestconfStream} instance
170      * @throws NullPointerException if {@code factory} is {@code null}
171      */
172     final <T> @NonNull RestconfFuture<RestconfStream<T>> createStream(final String description,
173             final String baseStreamLocation, final Source<T> source) {
174         final var stream = allocateStream(source);
175         final var name = stream.name();
176
177         // Now issue a put operation
178         final var ret = new SettableRestconfFuture<RestconfStream<T>>();
179         final var tx = dataBroker.newWriteOnlyTransaction();
180         tx.put(LogicalDatastoreType.OPERATIONAL, restconfStateStreamPath(name),
181             streamEntry(name, description, baseStreamLocation, stream.encodings()));
182         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
183             @Override
184             public void onSuccess(final CommitInfo result) {
185                 LOG.debug("Stream {} added", name);
186                 ret.set(stream);
187             }
188
189             @Override
190             public void onFailure(final Throwable cause) {
191                 LOG.debug("Failed to add stream {}", name, cause);
192                 streams.remove(name, stream);
193                 ret.setFailure(new RestconfDocumentedException("Failed to allocate stream " + name, cause));
194             }
195         }, MoreExecutors.directExecutor());
196         return ret;
197     }
198
199     private <T> @NonNull RestconfStream<T> allocateStream(final Source<T> source) {
200         String name;
201         RestconfStream<T> stream;
202         do {
203             // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
204             // it into UUID URN namespace as defined by RFC4122
205             name = "urn:uuid:" + UUID.randomUUID().toString();
206             stream = new RestconfStream<>(this, source, name);
207         } while (streams.putIfAbsent(name, stream) != null);
208
209         return stream;
210     }
211
212     /**
213      * Remove a particular stream and remove its entry from operational datastore.
214      *
215      * @param stream Stream to remove
216      */
217     final void removeStream(final RestconfStream<?> stream) {
218         // Defensive check to see if we are still tracking the stream
219         final var streamName = stream.name();
220         if (streams.get(streamName) != stream) {
221             LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", streamName, stream);
222             return;
223         }
224
225         // Now issue a delete operation while the name is still protected by being associated in the map.
226         final var tx = dataBroker.newWriteOnlyTransaction();
227         tx.delete(LogicalDatastoreType.OPERATIONAL, restconfStateStreamPath(streamName));
228         tx.commit().addCallback(new FutureCallback<CommitInfo>() {
229             @Override
230             public void onSuccess(final CommitInfo result) {
231                 LOG.debug("Stream {} removed", streamName);
232                 streams.remove(streamName, stream);
233             }
234
235             @Override
236             public void onFailure(final Throwable cause) {
237                 LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", streamName, cause);
238                 streams.remove(streamName, stream);
239             }
240         }, MoreExecutors.directExecutor());
241     }
242
243     private static @NonNull YangInstanceIdentifier restconfStateStreamPath(final String streamName) {
244         return RESTCONF_STATE_STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, streamName));
245     }
246
247     /**
248      * Return the base location URL of the streams service based on request URI.
249      *
250      * @param baseURI request base URI
251      * @throws IllegalArgumentException if the result would have been malformed
252      */
253     public final @NonNull String baseStreamLocation(final URI baseURI) {
254         try {
255             return new URI(streamsScheme(baseURI), baseURI.getRawUserInfo(), baseURI.getHost(), baseURI.getPort(),
256                 URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH, null, null)
257                 .toString();
258         } catch (URISyntaxException e) {
259             throw new IllegalArgumentException("Cannot derive streams location", e);
260         }
261     }
262
263     String streamsScheme(final URI baseURI) {
264         return baseURI.getScheme();
265     }
266
267     /**
268      * Create data-change-event stream with POST operation via RPC.
269      *
270      * @param input Input of RPC - example in JSON (data-change-event stream):
271      *              <pre>
272      *              {@code
273      *                  {
274      *                      "input": {
275      *                          "path": "/toaster:toaster/toaster:toasterStatus",
276      *                          "sal-remote-augment:datastore": "OPERATIONAL",
277      *                      }
278      *                  }
279      *              }
280      *              </pre>
281      * @param modelContext Reference to {@link EffectiveModelContext}.
282      * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
283      *     <pre>
284      *     {@code
285      *         {
286      *             "output": {
287      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
288      *             }
289      *         }
290      *     }
291      *     </pre>
292      */
293     // FIXME: this really should be a normal RPC implementation
294     public final RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(
295             final DatabindProvider databindProvider, final URI baseURI, final ContainerNode input,
296             final EffectiveModelContext modelContext) {
297         final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
298         final var datastore = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
299             : LogicalDatastoreType.CONFIGURATION;
300         final var path = preparePath(input);
301
302         return createStream(
303             "Events occuring in " + datastore + " datastore under /" + IdentifierCodec.serialize(path, modelContext),
304             baseStreamLocation(baseURI), new DataTreeChangeSource(databindProvider, dataBroker, datastore, path))
305             .transform(stream -> Optional.of(Builders.containerBuilder()
306                 .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
307                 .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name()))
308                 .build()));
309     }
310
311     // FIXME: this really should be a normal RPC implementation
312     public final RestconfFuture<Optional<ContainerNode>> createNotificationStream(
313             final DatabindProvider databindProvider, final URI baseURI, final ContainerNode input,
314             final EffectiveModelContext modelContext) {
315         final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
316             .map(LeafSetEntryNode::body)
317             .map(QName::create)
318             .sorted()
319             .collect(ImmutableSet.toImmutableSet());
320
321         final var description = new StringBuilder("YANG notifications matching any of {");
322         var haveFirst = false;
323         for (var qname : qnames) {
324             final var module = modelContext.findModuleStatement(qname.getModule())
325                 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
326                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
327             final var stmt = module.findSchemaTreeNode(qname)
328                 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown notification",
329                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
330             if (!(stmt instanceof NotificationEffectiveStatement)) {
331                 throw new RestconfDocumentedException(qname + " refers to a non-notification",
332                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
333             }
334
335             if (haveFirst) {
336                 description.append(",\n");
337             } else {
338                 haveFirst = true;
339             }
340             description.append("\n  ")
341                 .append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
342         }
343         description.append("\n}");
344
345         return createStream(description.toString(), baseStreamLocation(baseURI),
346             new NotificationSource(databindProvider, notificationService, qnames))
347             .transform(stream -> Optional.of(Builders.containerBuilder()
348                 .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
349                 .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name()))
350                 .build()));
351     }
352
353     /**
354      * Create device notification stream.
355      *
356      * @param input RPC input
357      * @return {@link DOMRpcResult} - Output of RPC - example in JSON
358      */
359     // FIXME: this should be an RPC invocation
360     public final RestconfFuture<Optional<ContainerNode>> createDeviceNotificationStream(final URI baseURI,
361             final ContainerNode input, final EffectiveModelContext modelContext) {
362         // parsing out of container with settings and path
363         // FIXME: ugly cast
364         final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
365                 .map(DataContainerChild::body)
366                 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
367                     ErrorTag.DATA_MISSING));
368
369         if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
370             throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
371                 ErrorTag.INVALID_VALUE);
372         }
373         if (listId.size() != 1) {
374             throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
375                 ErrorTag.INVALID_VALUE);
376         }
377
378         final var baseStreamsUri = baseStreamLocation(baseURI);
379         return createStream(
380             "All YANG notifications occuring on mount point /" + IdentifierCodec.serialize(path, modelContext),
381             baseStreamsUri,
382             new DeviceNotificationSource(mountPointService, path))
383             .transform(stream -> Optional.of(Builders.containerBuilder()
384                 .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
385                 .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH_NODEID,
386                     baseStreamsUri + '/' + stream.name()))
387                 .build()));
388     }
389
390     /**
391      * Prepare {@link YangInstanceIdentifier} of stream source.
392      *
393      * @param data Container with stream settings (RPC create-stream).
394      * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
395      *         are going to be generated.
396      */
397     private static YangInstanceIdentifier preparePath(final ContainerNode data) {
398         final var pathLeaf = data.childByArg(PATH_NODEID);
399         if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
400             return pathValue;
401         }
402
403         throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
404             ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
405     }
406
407     private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
408         return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
409             ? str : null;
410     }
411
412     @VisibleForTesting
413     static @NonNull MapEntryNode streamEntry(final String name, final String description,
414             final String baseStreamLocation, final Set<EncodingName> encodings) {
415         final var accessBuilder = Builders.mapBuilder().withNodeIdentifier(new NodeIdentifier(Access.QNAME));
416         for (var encoding : encodings) {
417             final var encodingName = encoding.name();
418             accessBuilder.withChild(Builders.mapEntryBuilder()
419                 .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, encodingName))
420                 .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, encodingName))
421                 .withChild(ImmutableNodes.leafNode(LOCATION_QNAME,
422                     baseStreamLocation + '/' + encodingName + '/' + name))
423                 .build());
424         }
425
426         return Builders.mapEntryBuilder()
427             .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name))
428             .withChild(ImmutableNodes.leafNode(NAME_QNAME, name))
429             .withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, description))
430             .withChild(accessBuilder.build())
431             .build();
432     }
433 }