2 * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.server.spi;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.Beta;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.UUID;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.netconf.databind.RequestException;
26 import org.opendaylight.restconf.server.api.ServerRequest;
27 import org.opendaylight.restconf.server.api.TransportSession;
28 import org.opendaylight.restconf.server.spi.RestconfStream.Source;
29 import org.opendaylight.restconf.server.spi.RestconfStream.Subscription;
30 import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionFilter;
31 import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionState;
32 import org.opendaylight.yangtools.yang.common.Empty;
33 import org.opendaylight.yangtools.yang.common.ErrorTag;
34 import org.opendaylight.yangtools.yang.common.ErrorType;
35 import org.opendaylight.yangtools.yang.common.QName;
36 import org.opendaylight.yangtools.yang.common.Uint32;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.AnydataNode;
39 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 * Reference base class for {@link RestconfStream.Registry} implementations.
46 public abstract class AbstractRestconfStreamRegistry implements RestconfStream.Registry {
48 * Default NETCONF stream. We follow
49 * <a href="https://www.rfc-editor.org/rfc/rfc8040#section-6.3.1">RFC 8040</a>.
51 private static final String DEFAULT_STREAM_NAME = "NETCONF";
52 private static final String DEFAULT_STREAM_DESCRIPTION = "Default XML encoded NETCONF stream";
55 * An Event Stream Filter.
59 public interface EventStreamFilter {
61 boolean test(YangInstanceIdentifier path, ContainerNode body);
64 private final class SubscriptionImpl extends AbstractRestconfStreamSubscription {
65 SubscriptionImpl(final Uint32 id, final QName encoding, final String streamName, final String receiverName,
66 final SubscriptionState state, final TransportSession session,
67 final @Nullable EventStreamFilter filter) {
68 super(id, encoding, streamName, receiverName, state, session, filter);
72 protected void terminateImpl(final ServerRequest<Empty> request, final QName reason) {
73 subscriptions.remove(id(), this);
74 request.completeWith(Empty.value());
78 public void channelClosed() {
79 subscriptions.remove(id());
83 private static final Logger LOG = LoggerFactory.getLogger(AbstractRestconfStreamRegistry.class);
86 * Previous dynamic subscription ID. We follow
87 * <a href="https://www.rfc-editor.org/rfc/rfc8639.html#section-6>Implementation Considerations</a> here:
90 * A best practice is to use the lower half of the "id"
91 * object's integer space when that "id" is assigned by an external
92 * entity (such as with a configured subscription). This leaves the
93 * upper half of the subscription integer space available to be
94 * dynamically assigned by the publisher.
97 private final AtomicInteger prevDynamicId = new AtomicInteger(Integer.MAX_VALUE);
98 private final ConcurrentMap<String, RestconfStream<?>> streams = new ConcurrentHashMap<>();
99 private final ConcurrentMap<Uint32, Subscription> subscriptions = new ConcurrentHashMap<>();
100 private final ConcurrentMap<String, EventStreamFilter> filters = new ConcurrentHashMap<>();
103 public final RestconfStream<?> lookupStream(final String name) {
104 return streams.get(requireNonNull(name));
108 public final <T> void createStream(final ServerRequest<RestconfStream<T>> request, final URI restconfURI,
109 final RestconfStream.Source<T> source, final String description) {
110 final var stream = allocateStream(source);
111 final var name = stream.name();
112 if (description.isBlank()) {
113 throw new IllegalArgumentException("Description must be descriptive");
116 Futures.addCallback(putStream(stream, description, restconfURI), new FutureCallback<>() {
118 public void onSuccess(final Void result) {
119 LOG.debug("Stream {} added", name);
120 request.completeWith(stream);
124 public void onFailure(final Throwable cause) {
125 LOG.debug("Failed to add stream {}", name, cause);
126 streams.remove(name, stream);
127 request.completeWith(new RequestException("Failed to create stream " + name, cause));
129 }, MoreExecutors.directExecutor());
133 @Deprecated(since = "9.0.0", forRemoval = true)
134 public <T> void createLegacyStream(final ServerRequest<RestconfStream<T>> request, final URI restconfURI,
135 final Source<T> source, final String description) {
136 createStream(request, restconfURI, source, description);
140 * Create default {@link RestconfStream} with a predefined name.
142 * <p>This method will create the corresponding instance and register it.
144 * @param <T> Stream type
145 * @param source Stream instance
146 * @throws NullPointerException if any argument is {@code null}
148 protected final <T> void start(final Source<T> source) {
149 final var stream = new RestconfStream<>(this, source, DEFAULT_STREAM_NAME);
150 streams.put(DEFAULT_STREAM_NAME, stream);
151 Futures.addCallback(putStream(stream, DEFAULT_STREAM_DESCRIPTION, null), new FutureCallback<>() {
153 public void onSuccess(final Void result) {
154 LOG.debug("Default stream {} added", DEFAULT_STREAM_NAME);
158 public void onFailure(final Throwable cause) {
159 LOG.debug("Failed to add default stream {}", DEFAULT_STREAM_NAME, cause);
160 streams.remove(DEFAULT_STREAM_NAME, stream);
162 }, MoreExecutors.directExecutor());
165 private <T> RestconfStream<T> allocateStream(final Source<T> source) {
167 RestconfStream<T> stream;
169 // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
170 // it into UUID URN namespace as defined by RFC4122
171 name = "urn:uuid:" + UUID.randomUUID().toString();
172 stream = new RestconfStream<>(this, source, name);
173 } while (streams.putIfAbsent(name, stream) != null);
178 protected abstract @NonNull ListenableFuture<Void> putStream(@NonNull RestconfStream<?> stream,
179 @NonNull String description, @Nullable URI restconfURI);
182 * Remove a particular stream and remove its entry from operational datastore.
184 * @param stream Stream to remove
186 final void removeStream(final RestconfStream<?> stream) {
187 // Defensive check to see if we are still tracking the stream
188 final var name = stream.name();
189 if (streams.get(name) != stream) {
190 LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", name, stream);
194 Futures.addCallback(deleteStream(name), new FutureCallback<>() {
196 public void onSuccess(final Void result) {
197 LOG.debug("Stream {} removed", name);
198 streams.remove(name, stream);
202 public void onFailure(final Throwable cause) {
203 LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", name, cause);
204 streams.remove(name, stream);
206 }, MoreExecutors.directExecutor());
209 protected abstract @NonNull ListenableFuture<Void> deleteStream(@NonNull String streamName);
212 public final Subscription lookupSubscription(final Uint32 id) {
213 return subscriptions.get(requireNonNull(id));
217 public final void establishSubscription(final ServerRequest<Subscription> request, final String streamName,
218 final QName encoding, final @Nullable SubscriptionFilter filter) {
219 final var stream = lookupStream(streamName);
220 if (stream == null) {
221 request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
222 "%s refers to an unknown stream", streamName));
226 final EventStreamFilter filterImpl;
228 filterImpl = resolveFilter(filter);
229 } catch (RequestException e) {
230 request.completeWith(e);
234 final var principal = request.principal();
235 final var id = Uint32.fromIntBits(prevDynamicId.incrementAndGet());
236 final var subscription = new SubscriptionImpl(id, encoding, streamName,
237 // FIXME: 'anonymous' instead of 'unknown' ?
238 principal != null ? principal.getName() : "<unknown>",
239 SubscriptionState.START, request.session(),
242 Futures.addCallback(createSubscription(subscription), new FutureCallback<Subscription>() {
244 public void onSuccess(final Subscription result) {
245 subscriptions.put(id, result);
246 request.completeWith(result);
250 public void onFailure(final Throwable cause) {
251 request.completeWith(new RequestException(cause));
253 }, MoreExecutors.directExecutor());
257 public void modifySubscription(final ServerRequest<Subscription> request, final Uint32 id,
258 final SubscriptionFilter filter) {
259 final var oldSubscription = lookupSubscription(id);
260 if (oldSubscription == null) {
261 request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
262 "There is no subscription with given ID."));
266 final EventStreamFilter filterImpl;
268 filterImpl = resolveFilter(filter);
269 } catch (RequestException e) {
270 request.completeWith(e);
273 final var newSubscription = new SubscriptionImpl(id, oldSubscription.encoding(), oldSubscription.streamName(),
274 oldSubscription.receiverName(), SubscriptionState.ACTIVE, oldSubscription.session(), filterImpl);
276 Futures.addCallback(modifySubscriptionFilter(newSubscription, filter), new FutureCallback<>() {
278 public void onSuccess(final Subscription result) {
279 subscriptions.put(id, result);
280 request.completeWith(result);
284 public void onFailure(final Throwable cause) {
285 request.completeWith(new RequestException(cause));
287 }, MoreExecutors.directExecutor());
291 public void updateSubscriptionState(final Subscription subscription, final SubscriptionState newState) {
292 requireNonNull(subscription);
293 subscription.setState(newState);
294 subscriptions.replace(subscription.id(), subscription);
298 protected abstract ListenableFuture<Subscription> createSubscription(Subscription subscription);
301 protected abstract ListenableFuture<Subscription> modifySubscriptionFilter(Subscription subscription,
302 SubscriptionFilter filter);
304 protected void putFilter(final String name, final EventStreamFilter filter) {
305 filters.put(name, filter);
308 protected void removeFilter(final String name) {
309 filters.remove(name);
312 protected @Nullable EventStreamFilter resolveFilter(final @Nullable SubscriptionFilter filter)
313 throws RequestException {
314 return switch (filter) {
316 case SubscriptionFilter.Reference(var filterName) -> getFilter(filterName);
317 case SubscriptionFilter.SubtreeDefinition(var anydata) -> parseSubtreeFilter(anydata);
318 case SubscriptionFilter.XPathDefinition(final var xpath) -> parseXpathFilter(xpath);
323 private EventStreamFilter getFilter(final String filterName) throws RequestException {
324 final var impl = filters.get(filterName);
328 throw new RequestException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
329 "%s refers to an unknown stream filter", filterName);
333 protected abstract EventStreamFilter parseSubtreeFilter(AnydataNode<?> filter) throws RequestException;
336 private static EventStreamFilter parseXpathFilter(final String xpath) throws RequestException {
337 // TODO: integrate yang-xpath-api and validate the propose xpath
338 // TODO: implement XPath filter evaluation
339 throw new RequestException(ErrorType.APPLICATION, ErrorTag.OPERATION_NOT_SUPPORTED,
340 "XPath filtering not implemented");