2 * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.server.spi;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
18 import java.net.URISyntaxException;
20 import java.util.UUID;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
26 import org.opendaylight.restconf.common.errors.RestconfFuture;
27 import org.opendaylight.restconf.common.errors.SettableRestconfFuture;
28 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
29 import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
30 import org.opendaylight.restconf.server.spi.RestconfStream.Source;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.Stream;
32 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
33 import org.opendaylight.yangtools.yang.common.QName;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
36 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
37 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
38 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
43 * Reference base class for {@link RestconfStream.Registry} implementations.
45 public abstract class AbstractRestconfStreamRegistry implements RestconfStream.Registry {
46 private static final Logger LOG = LoggerFactory.getLogger(AbstractRestconfStreamRegistry.class);
49 public static final QName NAME_QNAME = QName.create(Stream.QNAME, "name").intern();
51 public static final QName DESCRIPTION_QNAME = QName.create(Stream.QNAME, "description").intern();
53 public static final QName ENCODING_QNAME = QName.create(Stream.QNAME, "encoding").intern();
55 public static final QName LOCATION_QNAME = QName.create(Stream.QNAME, "location").intern();
57 private final ConcurrentMap<String, RestconfStream<?>> streams = new ConcurrentHashMap<>();
58 private final boolean useWebsockets;
60 protected AbstractRestconfStreamRegistry(final boolean useWebsockets) {
61 this.useWebsockets = useWebsockets;
65 public final @Nullable RestconfStream<?> lookupStream(final String name) {
66 return streams.get(requireNonNull(name));
70 public final <T> RestconfFuture<RestconfStream<T>> createStream(final URI restconfURI, final Source<T> source,
71 final String description) {
72 final var baseStreamLocation = baseStreamLocation(restconfURI);
73 final var stream = allocateStream(source);
74 final var name = stream.name();
75 if (description.isBlank()) {
76 throw new IllegalArgumentException("Description must be descriptive");
79 final var ret = new SettableRestconfFuture<RestconfStream<T>>();
80 Futures.addCallback(putStream(streamEntry(name, description, baseStreamLocation, stream.encodings())),
81 new FutureCallback<Object>() {
83 public void onSuccess(final Object result) {
84 LOG.debug("Stream {} added", name);
89 public void onFailure(final Throwable cause) {
90 LOG.debug("Failed to add stream {}", name, cause);
91 streams.remove(name, stream);
92 ret.setFailure(new RestconfDocumentedException("Failed to allocate stream " + name, cause));
94 }, MoreExecutors.directExecutor());
98 private <T> RestconfStream<T> allocateStream(final Source<T> source) {
100 RestconfStream<T> stream;
102 // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
103 // it into UUID URN namespace as defined by RFC4122
104 name = "urn:uuid:" + UUID.randomUUID().toString();
105 stream = new RestconfStream<>(this, source, name);
106 } while (streams.putIfAbsent(name, stream) != null);
111 protected abstract @NonNull ListenableFuture<?> putStream(@NonNull MapEntryNode stream);
114 * Remove a particular stream and remove its entry from operational datastore.
116 * @param stream Stream to remove
118 final void removeStream(final RestconfStream<?> stream) {
119 // Defensive check to see if we are still tracking the stream
120 final var name = stream.name();
121 if (streams.get(name) != stream) {
122 LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", name, stream);
126 Futures.addCallback(deleteStream(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name)),
127 new FutureCallback<Object>() {
129 public void onSuccess(final Object result) {
130 LOG.debug("Stream {} removed", name);
131 streams.remove(name, stream);
135 public void onFailure(final Throwable cause) {
136 LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", name, cause);
137 streams.remove(name, stream);
139 }, MoreExecutors.directExecutor());
142 protected abstract @NonNull ListenableFuture<?> deleteStream(@NonNull NodeIdentifierWithPredicates streamName);
145 * Return the base location URL of the streams service based on request URI.
147 * @param restconfURI request base URI
148 * @throws IllegalArgumentException if the result would have been malformed
150 protected final @NonNull String baseStreamLocation(final URI restconfURI) {
151 var scheme = restconfURI.getScheme();
153 scheme = switch (scheme) {
154 // Secured HTTP goes to Secured WebSockets
155 case "https" -> "wss";
156 // Unsecured HTTP and others go to unsecured WebSockets
162 return new URI(scheme, restconfURI.getRawUserInfo(), restconfURI.getHost(), restconfURI.getPort(),
163 restconfURI.getPath() + '/' + URLConstants.STREAMS_SUBPATH, null, null)
165 } catch (URISyntaxException e) {
166 throw new IllegalArgumentException("Cannot derive streams location", e);
171 public static final @NonNull MapEntryNode streamEntry(final String name, final String description,
172 final String baseStreamLocation, final Set<EncodingName> encodings) {
173 final var accessBuilder = Builders.mapBuilder().withNodeIdentifier(new NodeIdentifier(Access.QNAME));
174 for (var encoding : encodings) {
175 final var encodingName = encoding.name();
176 accessBuilder.withChild(Builders.mapEntryBuilder()
177 .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, encodingName))
178 .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, encodingName))
179 .withChild(ImmutableNodes.leafNode(LOCATION_QNAME,
180 baseStreamLocation + '/' + encodingName + '/' + name))
184 return Builders.mapEntryBuilder()
185 .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name))
186 .withChild(ImmutableNodes.leafNode(NAME_QNAME, name))
187 .withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, description))
188 .withChild(accessBuilder.build())