2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.MoreObjects;
13 import com.google.common.base.MoreObjects.ToStringHelper;
14 import com.google.common.collect.ImmutableMap;
15 import java.io.UnsupportedEncodingException;
16 import java.lang.invoke.MethodHandles;
17 import java.lang.invoke.VarHandle;
18 import java.time.Instant;
20 import java.util.regex.Pattern;
21 import javax.xml.xpath.XPathExpressionException;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
26 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
27 import org.opendaylight.yangtools.concepts.Registration;
28 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * A RESTCONF notification event stream. Each stream produces a number of events encoded in at least one encoding. The
34 * set of supported encodings is available through {@link #encodings()}.
36 * @param <T> Type of processed events
38 public final class RestconfStream<T> {
40 * An opinionated view on what values we can produce for {@link Access#getEncoding()}. The name can only be composed
41 * of one or more characters matching {@code [a-zA-Z]}.
43 * @param name Encoding name, as visible via the stream's {@code access} list
45 public record EncodingName(@NonNull String name) {
46 private static final Pattern PATTERN = Pattern.compile("[a-zA-Z]+");
49 * Well-known JSON encoding defined by RFC8040's {@code ietf-restconf-monitoring.yang}.
51 public static final @NonNull EncodingName RFC8040_JSON = new EncodingName("json");
53 * Well-known XML encoding defined by RFC8040's {@code ietf-restconf-monitoring.yang}.
55 public static final @NonNull EncodingName RFC8040_XML = new EncodingName("xml");
58 if (!PATTERN.matcher(name).matches()) {
59 throw new IllegalArgumentException("name must match " + PATTERN);
65 * A sink of events for a {@link RestconfStream}.
67 public interface Sink<T> {
69 * Publish a set of events generated from input data.
71 * @param modelContext An {@link EffectiveModelContext} used to format the input
72 * @param input Input data
73 * @param now Current time
74 * @throws NullPointerException if any argument is {@code null}
76 void publish(EffectiveModelContext modelContext, T input, Instant now);
79 * Called when the stream has reached its end.
85 * A source of events for a {@link RestconfStream}.
87 public abstract static class Source<T> {
88 // ImmutableMap because it retains iteration order
89 final @NonNull ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings;
91 protected Source(final ImmutableMap<EncodingName, ? extends EventFormatterFactory<T>> encodings) {
92 if (encodings.isEmpty()) {
93 throw new IllegalArgumentException("A source must support at least one encoding");
95 this.encodings = encodings;
98 protected abstract @NonNull Registration start(Sink<T> sink);
101 public final String toString() {
102 return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
105 protected ToStringHelper addToStringAttributes(final ToStringHelper helper) {
106 return helper.add("encodings", encodings.keySet());
110 private static final Logger LOG = LoggerFactory.getLogger(RestconfStream.class);
111 private static final VarHandle SUBSCRIBERS;
115 SUBSCRIBERS = MethodHandles.lookup().findVarHandle(RestconfStream.class, "subscribers", Subscribers.class);
116 } catch (NoSuchFieldException | IllegalAccessException e) {
117 throw new ExceptionInInitializerError(e);
121 private final @NonNull Sink<T> sink = new Sink<>() {
123 public void publish(final EffectiveModelContext modelContext, final T input, final Instant now) {
124 final var local = acquireSubscribers();
126 local.publish(modelContext, input, now);
128 LOG.debug("Ignoring publish() on terminated stream {}", RestconfStream.this);
133 public void endOfStream() {
134 // Atomic assertion we are ending plus guarded cleanup
135 final var local = (Subscribers<T>) SUBSCRIBERS.getAndSetRelease(RestconfStream.this, null);
142 private final @NonNull ListenersBroker listenersBroker;
143 private final @NonNull Source<T> source;
144 private final @NonNull String name;
146 // Accessed via SUBSCRIBERS, 'null' indicates we have been shut down
147 @SuppressWarnings("unused")
148 private volatile Subscribers<T> subscribers = Subscribers.empty();
151 private Registration registration;
153 RestconfStream(final ListenersBroker listenersBroker, final Source<T> source, final String name) {
154 this.listenersBroker = requireNonNull(listenersBroker);
155 this.source = requireNonNull(source);
156 this.name = requireNonNull(name);
160 * Get name of stream.
162 * @return Stream name.
164 public @NonNull String name() {
169 * Get supported {@link EncodingName}s. The set is guaranteed to contain at least one element and does not contain
172 * @return Supported encodings.
174 @SuppressWarnings("null")
175 @NonNull Set<EncodingName> encodings() {
176 return source.encodings.keySet();
180 * Registers {@link StreamSessionHandler} subscriber.
182 * @param handler SSE or WS session handler.
183 * @param encoding Requested event stream encoding
184 * @param params Reception parameters
185 * @return A new {@link Registration}, or {@code null} if the subscriber cannot be added
186 * @throws NullPointerException if any argument is {@code null}
187 * @throws UnsupportedEncodingException if {@code encoding} is not supported
188 * @throws XPathExpressionException if requested filter is not valid
189 * @throws InvalidArgumentException if the parameters are not supported
191 @Nullable Registration addSubscriber(final StreamSessionHandler handler, final EncodingName encoding,
192 final ReceiveEventsParams params) throws UnsupportedEncodingException, XPathExpressionException {
193 final var factory = source.encodings.get(requireNonNull(encoding));
194 if (factory == null) {
195 throw new UnsupportedEncodingException("Stream '" + name + "' does not support " + encoding);
198 final var startTime = params.startTime();
199 if (startTime != null) {
200 throw new IllegalArgumentException("Stream " + name + " does not support replay");
203 final var leafNodes = params.leafNodesOnly();
204 final var skipData = params.skipNotificationData();
205 final var changedLeafNodes = params.changedLeafNodesOnly();
206 final var childNodes = params.childNodesOnly();
208 final var textParams = new TextParameters(
209 leafNodes != null && leafNodes.value(),
210 skipData != null && skipData.value(),
211 changedLeafNodes != null && changedLeafNodes.value(),
212 childNodes != null && childNodes.value());
214 final var filter = params.filter();
215 final var filterValue = filter == null ? null : filter.paramValue();
216 final var formatter = filterValue == null || filterValue.isEmpty() ? factory.getFormatter(textParams)
217 : factory.getFormatter(textParams, filterValue);
220 // Lockless add of a subscriber. If we observe a null this stream is dead before the new subscriber could be
222 final var toAdd = new Subscriber<>(this, handler, formatter);
223 var observed = acquireSubscribers();
224 while (observed != null) {
225 final var next = observed.add(toAdd);
226 final var witness = (Subscribers<T>) SUBSCRIBERS.compareAndExchangeRelease(this, observed, next);
227 if (witness == observed) {
228 LOG.debug("Subscriber {} is added.", handler);
229 if (observed instanceof Subscribers.Empty) {
230 // We have became non-empty, start the source
236 // We have raced: retry the operation
244 * Removes a {@link Subscriber}. If this was the last subscriber also shut down this stream and initiate its removal
247 * @param subscriber The {@link Subscriber} to remove
248 * @throws NullPointerException if {@code subscriber} is {@code null}
250 void removeSubscriber(final Subscriber<T> subscriber) {
251 final var toRemove = requireNonNull(subscriber);
252 var observed = acquireSubscribers();
253 while (observed != null) {
254 final var next = observed.remove(toRemove);
255 final var witness = (Subscribers<T>) SUBSCRIBERS.compareAndExchangeRelease(this, observed, next);
256 if (witness == observed) {
257 LOG.debug("Subscriber {} is removed", subscriber);
259 // We have lost the last subscriber, terminate.
265 // We have raced: retry the operation
270 private Subscribers<T> acquireSubscribers() {
271 return (Subscribers<T>) SUBSCRIBERS.getAcquire(this);
274 private void startSource() {
275 // We have not started the stream yet, make sure that happens. This is a bit more involved, as the source may
276 // immediately issue endOfStream(), which in turn invokes terminate(). But at that point start() has not return
277 // and therefore registration is still null -- and thus we need to see if we are still on-line.
278 final var reg = source.start(sink);
279 synchronized (this) {
280 if (acquireSubscribers() == null) {
288 private void terminate() {
289 synchronized (this) {
290 if (registration != null) {
291 registration.close();
295 listenersBroker.removeStream(this);
299 public String toString() {
300 return MoreObjects.toStringHelper(this).add("name", name).add("source", source).toString();