Expose streams with all supported encodings
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / RestconfStream.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import com.google.common.collect.ImmutableMap;
15 import java.io.UnsupportedEncodingException;
16 import java.lang.invoke.MethodHandles;
17 import java.lang.invoke.VarHandle;
18 import java.time.Instant;
19 import java.util.Set;
20 import java.util.regex.Pattern;
21 import javax.xml.xpath.XPathExpressionException;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
26 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
27 import org.opendaylight.yangtools.concepts.Registration;
28 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * A RESTCONF notification event stream. Each stream produces a number of events encoded in at least one encoding. The
34  * set of supported encodings is available through {@link #encodings()}.
35  *
36  * @param <T> Type of processed events
37  */
38 public final class RestconfStream<T> {
39     /**
40      * An opinionated view on what values we can produce for {@link Access#getEncoding()}. The name can only be composed
41      * of one or more characters matching {@code [a-zA-Z]}.
42      *
43      * @param name Encoding name, as visible via the stream's {@code access} list
44      */
45     public record EncodingName(@NonNull String name) {
46         private static final Pattern PATTERN = Pattern.compile("[a-zA-Z]+");
47
48         /**
49          * Well-known JSON encoding defined by RFC8040's {@code ietf-restconf-monitoring.yang}.
50          */
51         public static final @NonNull EncodingName RFC8040_JSON = new EncodingName("json");
52         /**
53          * Well-known XML encoding defined by RFC8040's {@code ietf-restconf-monitoring.yang}.
54          */
55         public static final @NonNull EncodingName RFC8040_XML = new EncodingName("xml");
56
57         public EncodingName {
58             if (!PATTERN.matcher(name).matches()) {
59                 throw new IllegalArgumentException("name must match " + PATTERN);
60             }
61         }
62     }
63
64     /**
65      * A sink of events for a {@link RestconfStream}.
66      */
67     public interface Sink<T> {
68         /**
69          * Publish a set of events generated from input data.
70          *
71          * @param modelContext An {@link EffectiveModelContext} used to format the input
72          * @param input Input data
73          * @param now Current time
74          * @throws NullPointerException if any argument is {@code null}
75          */
76         void publish(EffectiveModelContext modelContext, T input, Instant now);
77
78         /**
79          * Called when the stream has reached its end.
80          */
81         void endOfStream();
82     }
83
84     /**
85      * A source of events for a {@link RestconfStream}.
86      */
87     public abstract static class Source<T> {
88         // ImmutableMap because it retains iteration order
89         final @NonNull ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings;
90
91         protected Source(final ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings) {
92             if (encodings.isEmpty()) {
93                 throw new IllegalArgumentException("A source must support at least one encoding");
94             }
95             this.encodings = encodings;
96         }
97
98         protected abstract @NonNull Registration start(Sink<T> sink);
99
100         @Override
101         public final String toString() {
102             return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
103         }
104
105         protected ToStringHelper addToStringAttributes(final ToStringHelper helper) {
106             return helper.add("encodings", encodings.keySet());
107         }
108     }
109
110     private static final Logger LOG = LoggerFactory.getLogger(RestconfStream.class);
111     private static final VarHandle SUBSCRIBERS;
112
113     static {
114         try {
115             SUBSCRIBERS = MethodHandles.lookup().findVarHandle(RestconfStream.class, "subscribers", Subscribers.class);
116         } catch (NoSuchFieldException | IllegalAccessException e) {
117             throw new ExceptionInInitializerError(e);
118         }
119     }
120
121     private final @NonNull Sink<T> sink = new Sink<>() {
122         @Override
123         public void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
124             final var local = acquireSubscribers();
125             if (local != null) {
126                 local.publish(modelContext, input, now);
127             } else {
128                 LOG.debug("Ignoring publish() on terminated stream {}", RestconfStream.this);
129             }
130         }
131
132         @Override
133         public void endOfStream() {
134             // Atomic assertion we are ending plus guarded cleanup
135             final var local = (Subscribers<T>) SUBSCRIBERS.getAndSetRelease(RestconfStream.this, null);
136             if (local != null) {
137                 terminate();
138                 local.endOfStream();
139             }
140         }
141     };
142     private final @NonNull ListenersBroker listenersBroker;
143     private final @NonNull Source<T> source;
144     private final @NonNull String name;
145
146     // Accessed via SUBSCRIBERS, 'null' indicates we have been shut down
147     @SuppressWarnings("unused")
148     private volatile Subscribers<T> subscribers = Subscribers.empty();
149
150     @GuardedBy("this")
151     private Registration registration;
152
153     RestconfStream(final ListenersBroker listenersBroker, final Source<T> source, final String name) {
154         this.listenersBroker = requireNonNull(listenersBroker);
155         this.source = requireNonNull(source);
156         this.name = requireNonNull(name);
157     }
158
159     /**
160      * Get name of stream.
161      *
162      * @return Stream name.
163      */
164     public @NonNull String name() {
165         return name;
166     }
167
168     /**
169      * Get supported {@link EncodingName}s. The set is guaranteed to contain at least one element and does not contain
170      * {@code null}s.
171      *
172      * @return Supported encodings.
173      */
174     @SuppressWarnings("null")
175     @NonNull Set<EncodingName> encodings() {
176         return source.encodings.keySet();
177     }
178
179     /**
180      * Registers {@link StreamSessionHandler} subscriber.
181      *
182      * @param handler SSE or WS session handler.
183      * @param encoding Requested event stream encoding
184      * @param params Reception parameters
185      * @return A new {@link Registration}, or {@code null} if the subscriber cannot be added
186      * @throws NullPointerException if any argument is {@code null}
187      * @throws UnsupportedEncodingException if {@code encoding} is not supported
188      * @throws XPathExpressionException if requested filter is not valid
189      * @throws InvalidArgumentException if the parameters are not supported
190      */
191     @Nullable Registration addSubscriber(final StreamSessionHandler handler, final EncodingName encoding,
192             final ReceiveEventsParams params) throws UnsupportedEncodingException, XPathExpressionException {
193         final var factory = source.encodings.get(requireNonNull(encoding));
194         if (factory == null) {
195             throw new UnsupportedEncodingException("Stream '" + name + "' does not support " + encoding);
196         }
197
198         final var startTime = params.startTime();
199         if (startTime != null) {
200             throw new IllegalArgumentException("Stream " + name + " does not support replay");
201         }
202
203         final var leafNodes = params.leafNodesOnly();
204         final var skipData = params.skipNotificationData();
205         final var changedLeafNodes = params.changedLeafNodesOnly();
206         final var childNodes = params.childNodesOnly();
207
208         final var textParams = new TextParameters(
209             leafNodes != null && leafNodes.value(),
210             skipData != null && skipData.value(),
211             changedLeafNodes != null && changedLeafNodes.value(),
212             childNodes != null && childNodes.value());
213
214         final var filter = params.filter();
215         final var filterValue = filter == null ? null : filter.paramValue();
216         final var formatter = filterValue == null || filterValue.isEmpty() ? factory.getFormatter(textParams)
217             : factory.getFormatter(textParams, filterValue);
218
219
220         // Lockless add of a subscriber. If we observe a null this stream is dead before the new subscriber could be
221         // added.
222         final var toAdd = new Subscriber<>(this, handler, formatter);
223         var observed = acquireSubscribers();
224         while (observed != null) {
225             final var next = observed.add(toAdd);
226             final var witness = (Subscribers<T>) SUBSCRIBERS.compareAndExchangeRelease(this, observed, next);
227             if (witness == observed) {
228                 LOG.debug("Subscriber {} is added.", handler);
229                 if (observed instanceof Subscribers.Empty) {
230                     // We have became non-empty, start the source
231                     startSource();
232                 }
233                 return toAdd;
234             }
235
236             // We have raced: retry the operation
237             observed = witness;
238         }
239
240         return null;
241     }
242
243     /**
244      * Removes a {@link Subscriber}. If this was the last subscriber also shut down this stream and initiate its removal
245      * from global state.
246      *
247      * @param subscriber The {@link Subscriber} to remove
248      * @throws NullPointerException if {@code subscriber} is {@code null}
249      */
250     void removeSubscriber(final Subscriber<T> subscriber) {
251         final var toRemove = requireNonNull(subscriber);
252         var observed = acquireSubscribers();
253         while (observed != null) {
254             final var next = observed.remove(toRemove);
255             final var witness = (Subscribers<T>) SUBSCRIBERS.compareAndExchangeRelease(this, observed, next);
256             if (witness == observed) {
257                 LOG.debug("Subscriber {} is removed", subscriber);
258                 if (next == null) {
259                     // We have lost the last subscriber, terminate.
260                     terminate();
261                 }
262                 return;
263             }
264
265             // We have raced: retry the operation
266             observed = witness;
267         }
268     }
269
270     private Subscribers<T> acquireSubscribers() {
271         return (Subscribers<T>) SUBSCRIBERS.getAcquire(this);
272     }
273
274     private void startSource() {
275         // We have not started the stream yet, make sure that happens. This is a bit more involved, as the source may
276         // immediately issue endOfStream(), which in turn invokes terminate(). But at that point start() has not return
277         // and therefore registration is still null -- and thus we need to see if we are still on-line.
278         final var reg = source.start(sink);
279         synchronized (this) {
280             if (acquireSubscribers() == null) {
281                 reg.close();
282             } else {
283                 registration = reg;
284             }
285         }
286     }
287
288     private void terminate() {
289         synchronized (this) {
290             if (registration != null) {
291                 registration.close();
292                 registration = null;
293             }
294         }
295         listenersBroker.removeStream(this);
296     }
297
298     @Override
299     public String toString() {
300         return MoreObjects.toStringHelper(this).add("name", name).add("source", source).toString();
301     }
302 }