Refactor ServerRequest
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / server / spi / AbstractRestconfStreamRegistry.java
1 /*
2  * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.server.spi;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.net.URI;
18 import java.net.URISyntaxException;
19 import java.util.Set;
20 import java.util.UUID;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
26 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
27 import org.opendaylight.restconf.server.api.ServerRequest;
28 import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
29 import org.opendaylight.restconf.server.spi.RestconfStream.Source;
30 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.Stream;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
32 import org.opendaylight.yangtools.yang.common.QName;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
35 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
36 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41  * Reference base class for {@link RestconfStream.Registry} implementations.
42  */
43 public abstract class AbstractRestconfStreamRegistry implements RestconfStream.Registry {
44     private static final Logger LOG = LoggerFactory.getLogger(AbstractRestconfStreamRegistry.class);
45
46     @VisibleForTesting
47     public static final QName NAME_QNAME =  QName.create(Stream.QNAME, "name").intern();
48     @VisibleForTesting
49     public static final QName DESCRIPTION_QNAME = QName.create(Stream.QNAME, "description").intern();
50     @VisibleForTesting
51     public static final QName ENCODING_QNAME =  QName.create(Stream.QNAME, "encoding").intern();
52     @VisibleForTesting
53     public static final QName LOCATION_QNAME =  QName.create(Stream.QNAME, "location").intern();
54
55     private final ConcurrentMap<String, RestconfStream<?>> streams = new ConcurrentHashMap<>();
56
57     @Override
58     public final @Nullable RestconfStream<?> lookupStream(final String name) {
59         return streams.get(requireNonNull(name));
60     }
61
62     @Override
63     public final <T> void createStream(final ServerRequest<RestconfStream<T>> request, final URI restconfURI,
64             final Source<T> source, final String description) {
65         final var baseStreamLocation = baseStreamLocation(restconfURI);
66         final var stream = allocateStream(source);
67         final var name = stream.name();
68         if (description.isBlank()) {
69             throw new IllegalArgumentException("Description must be descriptive");
70         }
71
72         Futures.addCallback(putStream(streamEntry(name, description, baseStreamLocation, stream.encodings())),
73             new FutureCallback<Object>() {
74                 @Override
75                 public void onSuccess(final Object result) {
76                     LOG.debug("Stream {} added", name);
77                     request.completeWith(stream);
78                 }
79
80                 @Override
81                 public void onFailure(final Throwable cause) {
82                     LOG.debug("Failed to add stream {}", name, cause);
83                     streams.remove(name, stream);
84                     request.completeWith(new RestconfDocumentedException("Failed to allocate stream " + name, cause));
85                 }
86             }, MoreExecutors.directExecutor());
87     }
88
89     private <T> RestconfStream<T> allocateStream(final Source<T> source) {
90         String name;
91         RestconfStream<T> stream;
92         do {
93             // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
94             // it into UUID URN namespace as defined by RFC4122
95             name = "urn:uuid:" + UUID.randomUUID().toString();
96             stream = new RestconfStream<>(this, source, name);
97         } while (streams.putIfAbsent(name, stream) != null);
98
99         return stream;
100     }
101
102     protected abstract @NonNull ListenableFuture<?> putStream(@NonNull MapEntryNode stream);
103
104     /**
105      * Remove a particular stream and remove its entry from operational datastore.
106      *
107      * @param stream Stream to remove
108      */
109     final void removeStream(final RestconfStream<?> stream) {
110         // Defensive check to see if we are still tracking the stream
111         final var name = stream.name();
112         if (streams.get(name) != stream) {
113             LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", name, stream);
114             return;
115         }
116
117         Futures.addCallback(deleteStream(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name)),
118             new FutureCallback<Object>() {
119                 @Override
120                 public void onSuccess(final Object result) {
121                     LOG.debug("Stream {} removed", name);
122                     streams.remove(name, stream);
123                 }
124
125                 @Override
126                 public void onFailure(final Throwable cause) {
127                     LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", name, cause);
128                     streams.remove(name, stream);
129                 }
130             }, MoreExecutors.directExecutor());
131     }
132
133     protected abstract @NonNull ListenableFuture<?> deleteStream(@NonNull NodeIdentifierWithPredicates streamName);
134
135     /**
136      * Return the base location URL of the streams service based on request URI.
137      *
138      * @param restconfURI request base URI, with trailing slash
139      * @throws IllegalArgumentException if the result would have been malformed
140      */
141     protected static final @NonNull String baseStreamLocation(final URI restconfURI) {
142         var scheme = restconfURI.getScheme();
143
144         try {
145             return new URI(scheme, restconfURI.getRawUserInfo(), restconfURI.getHost(), restconfURI.getPort(),
146                 restconfURI.getPath() + URLConstants.STREAMS_SUBPATH, null, null)
147                 .toString();
148         } catch (URISyntaxException e) {
149             throw new IllegalArgumentException("Cannot derive streams location", e);
150         }
151     }
152
153     @VisibleForTesting
154     public static final @NonNull MapEntryNode streamEntry(final String name, final String description,
155             final String baseStreamLocation, final Set<EncodingName> encodings) {
156         final var accessBuilder = ImmutableNodes.newSystemMapBuilder()
157             .withNodeIdentifier(new NodeIdentifier(Access.QNAME));
158         for (var encoding : encodings) {
159             final var encodingName = encoding.name();
160             accessBuilder.withChild(ImmutableNodes.newMapEntryBuilder()
161                 .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, encodingName))
162                 .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, encodingName))
163                 .withChild(ImmutableNodes.leafNode(LOCATION_QNAME,
164                     baseStreamLocation + '/' + encodingName + '/' + name))
165                 .build());
166         }
167
168         return ImmutableNodes.newMapEntryBuilder()
169             .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name))
170             .withChild(ImmutableNodes.leafNode(NAME_QNAME, name))
171             .withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, description))
172             .withChild(accessBuilder.build())
173             .build();
174     }
175 }