7c20a4789647d37320faad97af999c9ce7945820
[netconf.git] /
1 /*
2  * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.server.spi;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.Beta;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.net.URI;
18 import java.util.UUID;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentMap;
21 import java.util.concurrent.atomic.AtomicInteger;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.eclipse.jdt.annotation.NonNullByDefault;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.netconf.databind.RequestException;
26 import org.opendaylight.restconf.server.api.ServerRequest;
27 import org.opendaylight.restconf.server.api.TransportSession;
28 import org.opendaylight.restconf.server.spi.RestconfStream.Source;
29 import org.opendaylight.restconf.server.spi.RestconfStream.Subscription;
30 import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionFilter;
31 import org.opendaylight.restconf.server.spi.RestconfStream.SubscriptionState;
32 import org.opendaylight.yangtools.yang.common.Empty;
33 import org.opendaylight.yangtools.yang.common.ErrorTag;
34 import org.opendaylight.yangtools.yang.common.ErrorType;
35 import org.opendaylight.yangtools.yang.common.QName;
36 import org.opendaylight.yangtools.yang.common.Uint32;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.AnydataNode;
39 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * Reference base class for {@link RestconfStream.Registry} implementations.
45  */
46 public abstract class AbstractRestconfStreamRegistry implements RestconfStream.Registry {
47     /**
48      * Default NETCONF stream. We follow
49      * <a href="https://www.rfc-editor.org/rfc/rfc8040#section-6.3.1">RFC 8040</a>.
50      */
51     private static final String DEFAULT_STREAM_NAME = "NETCONF";
52     private static final String DEFAULT_STREAM_DESCRIPTION = "Default XML encoded NETCONF stream";
53
54     /**
55      * An Event Stream Filter.
56      */
57     @Beta
58     @NonNullByDefault
59     public interface EventStreamFilter {
60
61         boolean test(YangInstanceIdentifier path, ContainerNode body);
62     }
63
64     private final class SubscriptionImpl extends AbstractRestconfStreamSubscription {
65         SubscriptionImpl(final Uint32 id, final QName encoding, final String streamName, final String receiverName,
66                 final SubscriptionState state, final TransportSession session,
67                 final @Nullable EventStreamFilter filter) {
68             super(id, encoding, streamName, receiverName, state, session, filter);
69         }
70
71         @Override
72         protected void terminateImpl(final ServerRequest<Empty> request, final QName reason) {
73             subscriptions.remove(id(), this);
74             request.completeWith(Empty.value());
75         }
76
77         @Override
78         public void channelClosed() {
79             subscriptions.remove(id());
80         }
81     }
82
83     private static final Logger LOG = LoggerFactory.getLogger(AbstractRestconfStreamRegistry.class);
84
85     /**
86      * Previous dynamic subscription ID. We follow
87      * <a href="https://www.rfc-editor.org/rfc/rfc8639.html#section-6>Implementation Considerations</a> here:
88      *
89      * <blockquote>
90      *   A best practice is to use the lower half of the "id"
91      *   object's integer space when that "id" is assigned by an external
92      *   entity (such as with a configured subscription).  This leaves the
93      *   upper half of the subscription integer space available to be
94      *   dynamically assigned by the publisher.
95      * </blockquote>
96      */
97     private final AtomicInteger prevDynamicId = new AtomicInteger(Integer.MAX_VALUE);
98     private final ConcurrentMap<String, RestconfStream<?>> streams = new ConcurrentHashMap<>();
99     private final ConcurrentMap<Uint32, Subscription> subscriptions = new ConcurrentHashMap<>();
100     private final ConcurrentMap<String, EventStreamFilter> filters = new ConcurrentHashMap<>();
101
102     @Override
103     public final RestconfStream<?> lookupStream(final String name) {
104         return streams.get(requireNonNull(name));
105     }
106
107     @Override
108     public final <T> void createStream(final ServerRequest<RestconfStream<T>> request, final URI restconfURI,
109             final RestconfStream.Source<T> source, final String description) {
110         final var stream = allocateStream(source);
111         final var name = stream.name();
112         if (description.isBlank()) {
113             throw new IllegalArgumentException("Description must be descriptive");
114         }
115
116         Futures.addCallback(putStream(stream, description, restconfURI), new FutureCallback<>() {
117             @Override
118             public void onSuccess(final Void result) {
119                 LOG.debug("Stream {} added", name);
120                 request.completeWith(stream);
121             }
122
123             @Override
124             public void onFailure(final Throwable cause) {
125                 LOG.debug("Failed to add stream {}", name, cause);
126                 streams.remove(name, stream);
127                 request.completeWith(new RequestException("Failed to create stream " + name, cause));
128             }
129         }, MoreExecutors.directExecutor());
130     }
131
132     @Override
133     @Deprecated(since = "9.0.0", forRemoval = true)
134     public <T> void createLegacyStream(final ServerRequest<RestconfStream<T>> request, final URI restconfURI,
135             final Source<T> source, final String description) {
136         createStream(request, restconfURI, source, description);
137     }
138
139     /**
140      * Create default {@link RestconfStream} with a predefined name.
141      *
142      * <p>This method will create the corresponding instance and register it.
143      *
144      * @param <T> Stream type
145      * @param source Stream instance
146      * @throws NullPointerException if any argument is {@code null}
147      */
148     protected final <T> void start(final Source<T> source) {
149         final var stream = new RestconfStream<>(this, source, DEFAULT_STREAM_NAME);
150         streams.put(DEFAULT_STREAM_NAME, stream);
151         Futures.addCallback(putStream(stream, DEFAULT_STREAM_DESCRIPTION, null), new FutureCallback<>() {
152             @Override
153             public void onSuccess(final Void result) {
154                 LOG.debug("Default stream {} added", DEFAULT_STREAM_NAME);
155             }
156
157             @Override
158             public void onFailure(final Throwable cause) {
159                 LOG.debug("Failed to add default stream {}", DEFAULT_STREAM_NAME, cause);
160                 streams.remove(DEFAULT_STREAM_NAME, stream);
161             }
162         }, MoreExecutors.directExecutor());
163     }
164
165     private <T> RestconfStream<T> allocateStream(final Source<T> source) {
166         String name;
167         RestconfStream<T> stream;
168         do {
169             // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor
170             // it into UUID URN namespace as defined by RFC4122
171             name = "urn:uuid:" + UUID.randomUUID().toString();
172             stream = new RestconfStream<>(this, source, name);
173         } while (streams.putIfAbsent(name, stream) != null);
174
175         return stream;
176     }
177
178     protected abstract @NonNull ListenableFuture<Void> putStream(@NonNull RestconfStream<?> stream,
179         @NonNull String description, @Nullable URI restconfURI);
180
181     /**
182      * Remove a particular stream and remove its entry from operational datastore.
183      *
184      * @param stream Stream to remove
185      */
186     final void removeStream(final RestconfStream<?> stream) {
187         // Defensive check to see if we are still tracking the stream
188         final var name = stream.name();
189         if (streams.get(name) != stream) {
190             LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", name, stream);
191             return;
192         }
193
194         Futures.addCallback(deleteStream(name), new FutureCallback<>() {
195             @Override
196             public void onSuccess(final Void result) {
197                 LOG.debug("Stream {} removed", name);
198                 streams.remove(name, stream);
199             }
200
201             @Override
202             public void onFailure(final Throwable cause) {
203                 LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", name, cause);
204                 streams.remove(name, stream);
205             }
206         }, MoreExecutors.directExecutor());
207     }
208
209     protected abstract @NonNull ListenableFuture<Void> deleteStream(@NonNull String streamName);
210
211     @Override
212     public final Subscription lookupSubscription(final Uint32 id) {
213         return subscriptions.get(requireNonNull(id));
214     }
215
216     @Override
217     public final void establishSubscription(final ServerRequest<Subscription> request, final String streamName,
218             final QName encoding, final @Nullable SubscriptionFilter filter) {
219         final var stream = lookupStream(streamName);
220         if (stream == null) {
221             request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
222                 "%s refers to an unknown stream", streamName));
223             return;
224         }
225
226         final EventStreamFilter filterImpl;
227         try {
228             filterImpl = resolveFilter(filter);
229         } catch (RequestException e) {
230             request.completeWith(e);
231             return;
232         }
233
234         final var principal = request.principal();
235         final var id = Uint32.fromIntBits(prevDynamicId.incrementAndGet());
236         final var subscription = new SubscriptionImpl(id, encoding, streamName,
237             // FIXME: 'anonymous' instead of 'unknown' ?
238             principal != null ? principal.getName() : "<unknown>",
239             SubscriptionState.START, request.session(),
240             filterImpl);
241
242         Futures.addCallback(createSubscription(subscription), new FutureCallback<Subscription>() {
243             @Override
244             public void onSuccess(final Subscription result) {
245                 subscriptions.put(id, result);
246                 request.completeWith(result);
247             }
248
249             @Override
250             public void onFailure(final Throwable cause) {
251                 request.completeWith(new RequestException(cause));
252             }
253         }, MoreExecutors.directExecutor());
254     }
255
256     @Override
257     public void modifySubscription(final ServerRequest<Subscription> request, final Uint32 id,
258             final SubscriptionFilter filter) {
259         final var oldSubscription = lookupSubscription(id);
260         if (oldSubscription == null) {
261             request.completeWith(new RequestException(ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT,
262                 "There is no subscription with given ID."));
263             return;
264         }
265
266         final EventStreamFilter filterImpl;
267         try {
268             filterImpl = resolveFilter(filter);
269         } catch (RequestException e) {
270             request.completeWith(e);
271             return;
272         }
273         final var newSubscription = new SubscriptionImpl(id, oldSubscription.encoding(), oldSubscription.streamName(),
274             oldSubscription.receiverName(), SubscriptionState.ACTIVE, oldSubscription.session(), filterImpl);
275
276         Futures.addCallback(modifySubscriptionFilter(newSubscription, filter), new FutureCallback<>() {
277             @Override
278             public void onSuccess(final Subscription result) {
279                 subscriptions.put(id, result);
280                 request.completeWith(result);
281             }
282
283             @Override
284             public void onFailure(final Throwable cause) {
285                 request.completeWith(new RequestException(cause));
286             }
287         }, MoreExecutors.directExecutor());
288     }
289
290     @Override
291     public void updateSubscriptionState(final Subscription subscription, final SubscriptionState newState) {
292         requireNonNull(subscription);
293         subscription.setState(newState);
294         subscriptions.replace(subscription.id(), subscription);
295     }
296
297     @NonNullByDefault
298     protected abstract ListenableFuture<Subscription> createSubscription(Subscription subscription);
299
300     @NonNullByDefault
301     protected abstract ListenableFuture<Subscription> modifySubscriptionFilter(Subscription subscription,
302         SubscriptionFilter filter);
303
304     protected void putFilter(final String name, final EventStreamFilter filter) {
305         filters.put(name, filter);
306     }
307
308     protected void removeFilter(final String name) {
309         filters.remove(name);
310     }
311
312     protected @Nullable EventStreamFilter resolveFilter(final @Nullable SubscriptionFilter filter)
313             throws RequestException {
314         return switch (filter) {
315             case null -> null;
316             case SubscriptionFilter.Reference(var filterName) -> getFilter(filterName);
317             case SubscriptionFilter.SubtreeDefinition(var anydata) -> parseSubtreeFilter(anydata);
318             case SubscriptionFilter.XPathDefinition(final var xpath) -> parseXpathFilter(xpath);
319         };
320     }
321
322     @NonNullByDefault
323     private EventStreamFilter getFilter(final String filterName) throws RequestException {
324         final var impl = filters.get(filterName);
325         if (impl != null) {
326             return impl;
327         }
328         throw new RequestException(ErrorType.APPLICATION, ErrorTag.INVALID_VALUE,
329             "%s refers to an unknown stream filter", filterName);
330     }
331
332     @NonNullByDefault
333     protected abstract EventStreamFilter parseSubtreeFilter(AnydataNode<?> filter) throws RequestException;
334
335     @NonNullByDefault
336     private static EventStreamFilter parseXpathFilter(final String xpath) throws RequestException {
337         // TODO: integrate yang-xpath-api and validate the propose xpath
338         // TODO: implement XPath filter evaluation
339         throw new RequestException(ErrorType.APPLICATION, ErrorTag.OPERATION_NOT_SUPPORTED,
340             "XPath filtering not implemented");
341     }
342 }