18495a58c5f357283f2bd74820bb2ac3ab1c28a1
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / server / spi / RestconfStream.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.server.spi;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import com.google.common.collect.ImmutableMap;
15 import java.io.UnsupportedEncodingException;
16 import java.lang.invoke.MethodHandles;
17 import java.lang.invoke.VarHandle;
18 import java.net.URI;
19 import java.time.Instant;
20 import java.util.Set;
21 import java.util.regex.Pattern;
22 import javax.xml.xpath.XPathExpressionException;
23 import org.checkerframework.checker.lock.qual.GuardedBy;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.eclipse.jdt.annotation.Nullable;
26 import org.opendaylight.restconf.common.errors.RestconfFuture;
27 import org.opendaylight.restconf.server.api.EventStreamGetParams;
28 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
29 import org.opendaylight.yangtools.concepts.Registration;
30 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * A RESTCONF notification event stream. Each stream produces a number of events encoded in at least one encoding. The
36  * set of supported encodings is available through {@link #encodings()}.
37  *
38  * @param <T> Type of processed events
39  */
40 public final class RestconfStream<T> {
41     /**
42      * An opinionated view on what values we can produce for {@link Access#getEncoding()}. The name can only be composed
43      * of one or more characters matching {@code [a-zA-Z]}.
44      *
45      * @param name Encoding name, as visible via the stream's {@code access} list
46      */
47     public record EncodingName(@NonNull String name) {
48         private static final Pattern PATTERN = Pattern.compile("[a-zA-Z]+");
49
50         /**
51          * Well-known JSON encoding defined by RFC8040's {@code ietf-restconf-monitoring.yang}.
52          */
53         public static final @NonNull EncodingName RFC8040_JSON = new EncodingName("json");
54         /**
55          * Well-known XML encoding defined by RFC8040's {@code ietf-restconf-monitoring.yang}.
56          */
57         public static final @NonNull EncodingName RFC8040_XML = new EncodingName("xml");
58
59         public EncodingName {
60             if (!PATTERN.matcher(name).matches()) {
61                 throw new IllegalArgumentException("name must match " + PATTERN);
62             }
63         }
64     }
65
66     /**
67      * A sink of events for a {@link RestconfStream}.
68      */
69     public interface Sink<T> {
70         /**
71          * Publish a set of events generated from input data.
72          *
73          * @param modelContext An {@link EffectiveModelContext} used to format the input
74          * @param input Input data
75          * @param now Current time
76          * @throws NullPointerException if any argument is {@code null}
77          */
78         void publish(EffectiveModelContext modelContext, T input, Instant now);
79
80         /**
81          * Called when the stream has reached its end.
82          */
83         void endOfStream();
84     }
85
86     /**
87      * A source of events for a {@link RestconfStream}.
88      */
89     public abstract static class Source<T> {
90         // ImmutableMap because it retains iteration order
91         final @NonNull ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings;
92
93         protected Source(final ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings) {
94             if (encodings.isEmpty()) {
95                 throw new IllegalArgumentException("A source must support at least one encoding");
96             }
97             this.encodings = encodings;
98         }
99
100         protected abstract @NonNull Registration start(Sink<T> sink);
101
102         @Override
103         public final String toString() {
104             return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
105         }
106
107         protected ToStringHelper addToStringAttributes(final ToStringHelper helper) {
108             return helper.add("encodings", encodings.keySet());
109         }
110     }
111
112     /**
113      * Interface for session handler that is responsible for sending of data over established session.
114      */
115     public interface Sender {
116         /**
117          * Interface for sending String message through one of implementation.
118          *
119          * @param data Message data to be send.
120          */
121         void sendDataMessage(String data);
122
123         /**
124          * Called when the stream has reached its end. The handler should close all underlying resources.
125          */
126         void endOfStream();
127     }
128
129     /**
130      * An entity managing allocation and lookup of {@link RestconfStream}s.
131      */
132     public interface Registry {
133         /**
134          * Get a {@link RestconfStream} by its name.
135          *
136          * @param name Stream name.
137          * @return A {@link RestconfStream}, or {@code null} if the stream with specified name does not exist.
138          * @throws NullPointerException if {@code name} is {@code null}
139          */
140         @Nullable RestconfStream<?> lookupStream(String name);
141
142         /**
143          * Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name,
144          * create the corresponding instance and register it.
145          *
146          * @param <T> Stream type
147          * @param restconfURI resolved {@code {+restconf}} resource name
148          * @param source Stream instance
149          * @param description Stream descriptiion
150          * @return A future {@link RestconfStream} instance
151          * @throws NullPointerException if any argument is {@code null}
152          */
153         <T> @NonNull RestconfFuture<RestconfStream<T>> createStream(URI restconfURI, Source<T> source,
154             String description);
155     }
156
157     private static final Logger LOG = LoggerFactory.getLogger(RestconfStream.class);
158     private static final VarHandle SUBSCRIBERS;
159
160     static {
161         try {
162             SUBSCRIBERS = MethodHandles.lookup().findVarHandle(RestconfStream.class, "subscribers", Subscribers.class);
163         } catch (NoSuchFieldException | IllegalAccessException e) {
164             throw new ExceptionInInitializerError(e);
165         }
166     }
167
168     private final @NonNull Sink<T> sink = new Sink<>() {
169         @Override
170         public void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
171             final var local = acquireSubscribers();
172             if (local != null) {
173                 local.publish(modelContext, input, now);
174             } else {
175                 LOG.debug("Ignoring publish() on terminated stream {}", RestconfStream.this);
176             }
177         }
178
179         @Override
180         public void endOfStream() {
181             // Atomic assertion we are ending plus guarded cleanup
182             final var local = (Subscribers<T>) SUBSCRIBERS.getAndSetRelease(RestconfStream.this, null);
183             if (local != null) {
184                 terminate();
185                 local.endOfStream();
186             }
187         }
188     };
189     private final @NonNull AbstractRestconfStreamRegistry registry;
190     private final @NonNull Source<T> source;
191     private final @NonNull String name;
192
193     // Accessed via SUBSCRIBERS, 'null' indicates we have been shut down
194     @SuppressWarnings("unused")
195     private volatile Subscribers<T> subscribers = Subscribers.empty();
196
197     @GuardedBy("this")
198     private Registration registration;
199
200     RestconfStream(final AbstractRestconfStreamRegistry registry, final Source<T> source, final String name) {
201         this.registry = requireNonNull(registry);
202         this.source = requireNonNull(source);
203         this.name = requireNonNull(name);
204     }
205
206     /**
207      * Get name of stream.
208      *
209      * @return Stream name.
210      */
211     public @NonNull String name() {
212         return name;
213     }
214
215     /**
216      * Get supported {@link EncodingName}s. The set is guaranteed to contain at least one element and does not contain
217      * {@code null}s.
218      *
219      * @return Supported encodings.
220      */
221     @SuppressWarnings("null")
222     @NonNull Set<EncodingName> encodings() {
223         return source.encodings.keySet();
224     }
225
226     /**
227      * Registers {@link Sender} subscriber.
228      *
229      * @param handler SSE or WS session handler.
230      * @param encoding Requested event stream encoding
231      * @param params Reception parameters
232      * @return A new {@link Registration}, or {@code null} if the subscriber cannot be added
233      * @throws NullPointerException if any argument is {@code null}
234      * @throws UnsupportedEncodingException if {@code encoding} is not supported
235      * @throws XPathExpressionException if requested filter is not valid
236      */
237     public @Nullable Registration addSubscriber(final Sender handler, final EncodingName encoding,
238             final EventStreamGetParams params) throws UnsupportedEncodingException, XPathExpressionException {
239         final var factory = source.encodings.get(requireNonNull(encoding));
240         if (factory == null) {
241             throw new UnsupportedEncodingException("Stream '" + name + "' does not support " + encoding);
242         }
243
244         final var startTime = params.startTime();
245         if (startTime != null) {
246             throw new IllegalArgumentException("Stream " + name + " does not support replay");
247         }
248
249         final var leafNodes = params.leafNodesOnly() != null && params.leafNodesOnly().value();
250         final var skipData = params.skipNotificationData() != null && params.skipNotificationData().value();
251         final var changedLeafNodes = params.changedLeafNodesOnly() != null && params.changedLeafNodesOnly().value();
252         final var childNodes = params.childNodesOnly() != null && params.childNodesOnly().value();
253
254         final var textParams = new TextParameters(leafNodes, skipData, changedLeafNodes, childNodes);
255
256         final var filter = params.filter();
257         final var filterValue = filter == null ? null : filter.paramValue();
258         final var formatter = filterValue == null || filterValue.isEmpty() ? factory.getFormatter(textParams)
259             : factory.getFormatter(textParams, filterValue);
260
261
262         // Lockless add of a subscriber. If we observe a null this stream is dead before the new subscriber could be
263         // added.
264         final var toAdd = new Subscriber<>(this, handler, formatter);
265         var observed = acquireSubscribers();
266         while (observed != null) {
267             final var next = observed.add(toAdd);
268             final var witness = (Subscribers<T>) SUBSCRIBERS.compareAndExchangeRelease(this, observed, next);
269             if (witness == observed) {
270                 LOG.debug("Subscriber {} is added.", handler);
271                 if (observed instanceof Subscribers.Empty) {
272                     // We have became non-empty, start the source
273                     startSource();
274                 }
275                 return toAdd;
276             }
277
278             // We have raced: retry the operation
279             observed = witness;
280         }
281
282         return null;
283     }
284
285     /**
286      * Removes a {@link Subscriber}. If this was the last subscriber also shut down this stream and initiate its removal
287      * from global state.
288      *
289      * @param subscriber The {@link Subscriber} to remove
290      * @throws NullPointerException if {@code subscriber} is {@code null}
291      */
292     void removeSubscriber(final Subscriber<T> subscriber) {
293         final var toRemove = requireNonNull(subscriber);
294         var observed = acquireSubscribers();
295         while (observed != null) {
296             final var next = observed.remove(toRemove);
297             final var witness = (Subscribers<T>) SUBSCRIBERS.compareAndExchangeRelease(this, observed, next);
298             if (witness == observed) {
299                 LOG.debug("Subscriber {} is removed", subscriber);
300                 if (next == null) {
301                     // We have lost the last subscriber, terminate.
302                     terminate();
303                 }
304                 return;
305             }
306
307             // We have raced: retry the operation
308             observed = witness;
309         }
310     }
311
312     private Subscribers<T> acquireSubscribers() {
313         return (Subscribers<T>) SUBSCRIBERS.getAcquire(this);
314     }
315
316     private void startSource() {
317         // We have not started the stream yet, make sure that happens. This is a bit more involved, as the source may
318         // immediately issue endOfStream(), which in turn invokes terminate(). But at that point start() has not return
319         // and therefore registration is still null -- and thus we need to see if we are still on-line.
320         final var reg = source.start(sink);
321         synchronized (this) {
322             if (acquireSubscribers() == null) {
323                 reg.close();
324             } else {
325                 registration = reg;
326             }
327         }
328     }
329
330     private void terminate() {
331         synchronized (this) {
332             if (registration != null) {
333                 registration.close();
334                 registration = null;
335             }
336         }
337         registry.removeStream(this);
338     }
339
340     @Override
341     public String toString() {
342         return MoreObjects.toStringHelper(this).add("name", name).add("source", source).toString();
343     }
344 }