Eliminate NormalizedNodePayload
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / server / spi / RestconfStream.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.server.spi;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import com.google.common.collect.ImmutableMap;
15 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
16 import java.io.UnsupportedEncodingException;
17 import java.lang.invoke.MethodHandles;
18 import java.lang.invoke.VarHandle;
19 import java.net.URI;
20 import java.time.Instant;
21 import java.util.Set;
22 import java.util.regex.Pattern;
23 import javax.xml.xpath.XPathExpressionException;
24 import org.checkerframework.checker.lock.qual.GuardedBy;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.eclipse.jdt.annotation.Nullable;
27 import org.opendaylight.restconf.common.errors.RestconfFuture;
28 import org.opendaylight.restconf.server.api.EventStreamGetParams;
29 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
30 import org.opendaylight.yangtools.concepts.Registration;
31 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * A RESTCONF notification event stream. Each stream produces a number of events encoded in at least one encoding. The
37  * set of supported encodings is available through {@link #encodings()}.
38  *
39  * @param <T> Type of processed events
40  */
41 public final class RestconfStream<T> {
42     /**
43      * An opinionated view on what values we can produce for {@link Access#getEncoding()}. The name can only be composed
44      * of one or more characters matching {@code [a-zA-Z]}.
45      *
46      * @param name Encoding name, as visible via the stream's {@code access} list
47      */
48     public record EncodingName(@NonNull String name) {
49         private static final Pattern PATTERN = Pattern.compile("[a-zA-Z]+");
50
51         /**
52          * Well-known JSON encoding defined by RFC8040's {@code ietf-restconf-monitoring.yang}.
53          */
54         public static final @NonNull EncodingName RFC8040_JSON = new EncodingName("json");
55         /**
56          * Well-known XML encoding defined by RFC8040's {@code ietf-restconf-monitoring.yang}.
57          */
58         public static final @NonNull EncodingName RFC8040_XML = new EncodingName("xml");
59
60         public EncodingName {
61             if (!PATTERN.matcher(name).matches()) {
62                 throw new IllegalArgumentException("name must match " + PATTERN);
63             }
64         }
65     }
66
67     /**
68      * A sink of events for a {@link RestconfStream}.
69      */
70     public interface Sink<T> {
71         /**
72          * Publish a set of events generated from input data.
73          *
74          * @param modelContext An {@link EffectiveModelContext} used to format the input
75          * @param input Input data
76          * @param now Current time
77          * @throws NullPointerException if any argument is {@code null}
78          */
79         void publish(EffectiveModelContext modelContext, T input, Instant now);
80
81         /**
82          * Called when the stream has reached its end.
83          */
84         void endOfStream();
85     }
86
87     /**
88      * A source of events for a {@link RestconfStream}.
89      */
90     public abstract static class Source<T> {
91         // ImmutableMap because it retains iteration order
92         final @NonNull ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings;
93
94         protected Source(final ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings) {
95             if (encodings.isEmpty()) {
96                 throw new IllegalArgumentException("A source must support at least one encoding");
97             }
98             this.encodings = encodings;
99         }
100
101         protected abstract @NonNull Registration start(Sink<T> sink);
102
103         @Override
104         public final String toString() {
105             return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
106         }
107
108         protected ToStringHelper addToStringAttributes(final ToStringHelper helper) {
109             return helper.add("encodings", encodings.keySet());
110         }
111     }
112
113     /**
114      * Interface for session handler that is responsible for sending of data over established session.
115      */
116     public interface Sender {
117         /**
118          * Interface for sending String message through one of implementation.
119          *
120          * @param data Message data to be send.
121          */
122         void sendDataMessage(String data);
123
124         /**
125          * Called when the stream has reached its end. The handler should close all underlying resources.
126          */
127         void endOfStream();
128     }
129
130     /**
131      * An entity managing allocation and lookup of {@link RestconfStream}s.
132      */
133     public interface Registry {
134         /**
135          * Get a {@link RestconfStream} by its name.
136          *
137          * @param name Stream name.
138          * @return A {@link RestconfStream}, or {@code null} if the stream with specified name does not exist.
139          * @throws NullPointerException if {@code name} is {@code null}
140          */
141         @Nullable RestconfStream<?> lookupStream(String name);
142
143         /**
144          * Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name,
145          * create the corresponding instance and register it.
146          *
147          * @param <T> Stream type
148          * @param restconfURI resolved {@code {+restconf}} resource name
149          * @param source Stream instance
150          * @param description Stream descriptiion
151          * @return A future {@link RestconfStream} instance
152          * @throws NullPointerException if any argument is {@code null}
153          */
154         <T> @NonNull RestconfFuture<RestconfStream<T>> createStream(URI restconfURI, Source<T> source,
155             String description);
156     }
157
158     private static final Logger LOG = LoggerFactory.getLogger(RestconfStream.class);
159     private static final VarHandle SUBSCRIBERS;
160
161     static {
162         try {
163             SUBSCRIBERS = MethodHandles.lookup().findVarHandle(RestconfStream.class, "subscribers", Subscribers.class);
164         } catch (NoSuchFieldException | IllegalAccessException e) {
165             throw new ExceptionInInitializerError(e);
166         }
167     }
168
169     private final @NonNull Sink<T> sink = new Sink<>() {
170         @Override
171         public void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
172             final var local = acquireSubscribers();
173             if (local != null) {
174                 local.publish(modelContext, input, now);
175             } else {
176                 LOG.debug("Ignoring publish() on terminated stream {}", RestconfStream.this);
177             }
178         }
179
180         @Override
181         public void endOfStream() {
182             // Atomic assertion we are ending plus guarded cleanup
183             final var local = (Subscribers<T>) SUBSCRIBERS.getAndSetRelease(RestconfStream.this, null);
184             if (local != null) {
185                 terminate();
186                 local.endOfStream();
187             }
188         }
189     };
190     private final @NonNull AbstractRestconfStreamRegistry registry;
191     private final @NonNull Source<T> source;
192     private final @NonNull String name;
193
194     // Accessed via SUBSCRIBERS, 'null' indicates we have been shut down
195     @SuppressWarnings("unused")
196     @SuppressFBWarnings(value = "URF_UNREAD_FIELD", justification = "https://github.com/spotbugs/spotbugs/issues/2749")
197     private volatile Subscribers<T> subscribers = Subscribers.empty();
198
199     @GuardedBy("this")
200     private Registration registration;
201
202     RestconfStream(final AbstractRestconfStreamRegistry registry, final Source<T> source, final String name) {
203         this.registry = requireNonNull(registry);
204         this.source = requireNonNull(source);
205         this.name = requireNonNull(name);
206     }
207
208     /**
209      * Get name of stream.
210      *
211      * @return Stream name.
212      */
213     public @NonNull String name() {
214         return name;
215     }
216
217     /**
218      * Get supported {@link EncodingName}s. The set is guaranteed to contain at least one element and does not contain
219      * {@code null}s.
220      *
221      * @return Supported encodings.
222      */
223     @SuppressWarnings("null")
224     @NonNull Set<EncodingName> encodings() {
225         return source.encodings.keySet();
226     }
227
228     /**
229      * Registers {@link Sender} subscriber.
230      *
231      * @param handler SSE or WS session handler.
232      * @param encoding Requested event stream encoding
233      * @param params Reception parameters
234      * @return A new {@link Registration}, or {@code null} if the subscriber cannot be added
235      * @throws NullPointerException if any argument is {@code null}
236      * @throws UnsupportedEncodingException if {@code encoding} is not supported
237      * @throws XPathExpressionException if requested filter is not valid
238      */
239     public @Nullable Registration addSubscriber(final Sender handler, final EncodingName encoding,
240             final EventStreamGetParams params) throws UnsupportedEncodingException, XPathExpressionException {
241         final var factory = source.encodings.get(requireNonNull(encoding));
242         if (factory == null) {
243             throw new UnsupportedEncodingException("Stream '" + name + "' does not support " + encoding);
244         }
245
246         final var startTime = params.startTime();
247         if (startTime != null) {
248             throw new IllegalArgumentException("Stream " + name + " does not support replay");
249         }
250
251         final var leafNodes = params.leafNodesOnly() != null && params.leafNodesOnly().value();
252         final var skipData = params.skipNotificationData() != null && params.skipNotificationData().value();
253         final var changedLeafNodes = params.changedLeafNodesOnly() != null && params.changedLeafNodesOnly().value();
254         final var childNodes = params.childNodesOnly() != null && params.childNodesOnly().value();
255
256         final var textParams = new TextParameters(leafNodes, skipData, changedLeafNodes, childNodes);
257
258         final var filter = params.filter();
259         final var filterValue = filter == null ? null : filter.paramValue();
260         final var formatter = filterValue == null || filterValue.isEmpty() ? factory.getFormatter(textParams)
261             : factory.getFormatter(textParams, filterValue);
262
263
264         // Lockless add of a subscriber. If we observe a null this stream is dead before the new subscriber could be
265         // added.
266         final var toAdd = new Subscriber<>(this, handler, formatter);
267         var observed = acquireSubscribers();
268         while (observed != null) {
269             final var next = observed.add(toAdd);
270             final var witness = (Subscribers<T>) SUBSCRIBERS.compareAndExchangeRelease(this, observed, next);
271             if (witness == observed) {
272                 LOG.debug("Subscriber {} is added.", handler);
273                 if (observed instanceof Subscribers.Empty) {
274                     // We have became non-empty, start the source
275                     startSource();
276                 }
277                 return toAdd;
278             }
279
280             // We have raced: retry the operation
281             observed = witness;
282         }
283
284         return null;
285     }
286
287     /**
288      * Removes a {@link Subscriber}. If this was the last subscriber also shut down this stream and initiate its removal
289      * from global state.
290      *
291      * @param subscriber The {@link Subscriber} to remove
292      * @throws NullPointerException if {@code subscriber} is {@code null}
293      */
294     void removeSubscriber(final Subscriber<T> subscriber) {
295         final var toRemove = requireNonNull(subscriber);
296         var observed = acquireSubscribers();
297         while (observed != null) {
298             final var next = observed.remove(toRemove);
299             final var witness = (Subscribers<T>) SUBSCRIBERS.compareAndExchangeRelease(this, observed, next);
300             if (witness == observed) {
301                 LOG.debug("Subscriber {} is removed", subscriber);
302                 if (next == null) {
303                     // We have lost the last subscriber, terminate.
304                     terminate();
305                 }
306                 return;
307             }
308
309             // We have raced: retry the operation
310             observed = witness;
311         }
312     }
313
314     private Subscribers<T> acquireSubscribers() {
315         return (Subscribers<T>) SUBSCRIBERS.getAcquire(this);
316     }
317
318     private void startSource() {
319         // We have not started the stream yet, make sure that happens. This is a bit more involved, as the source may
320         // immediately issue endOfStream(), which in turn invokes terminate(). But at that point start() has not return
321         // and therefore registration is still null -- and thus we need to see if we are still on-line.
322         final var reg = source.start(sink);
323         synchronized (this) {
324             if (acquireSubscribers() == null) {
325                 reg.close();
326             } else {
327                 registration = reg;
328             }
329         }
330     }
331
332     private void terminate() {
333         synchronized (this) {
334             if (registration != null) {
335                 registration.close();
336                 registration = null;
337             }
338         }
339         registry.removeStream(this);
340     }
341
342     @Override
343     public String toString() {
344         return MoreObjects.toStringHelper(this).add("name", name).add("source", source).toString();
345     }
346 }