* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.restconf.nb.rfc8040.streams;
+package org.opendaylight.restconf.server.spi;
import static java.util.Objects.requireNonNull;
import java.io.UnsupportedEncodingException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
+import java.net.URI;
import java.time.Instant;
import java.util.Set;
import java.util.regex.Pattern;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.restconf.common.errors.RestconfFuture;
import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
import org.opendaylight.yangtools.concepts.Registration;
}
}
+ /**
+ * Interface for session handler that is responsible for sending of data over established session.
+ */
+ public interface Sender {
+ /**
+ * Interface for sending String message through one of implementation.
+ *
+ * @param data Message data to be send.
+ */
+ void sendDataMessage(String data);
+
+ /**
+ * Called when the stream has reached its end. The handler should close all underlying resources.
+ */
+ void endOfStream();
+ }
+
+ /**
+ * An entity managing allocation and lookup of {@link RestconfStream}s.
+ */
+ public interface Registry {
+ /**
+ * Get a {@link RestconfStream} by its name.
+ *
+ * @param name Stream name.
+ * @return A {@link RestconfStream}, or {@code null} if the stream with specified name does not exist.
+ * @throws NullPointerException if {@code name} is {@code null}
+ */
+ @Nullable RestconfStream<?> lookupStream(String name);
+
+ /**
+ * Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name,
+ * create the corresponding instance and register it.
+ *
+ * @param <T> Stream type
+ * @param restconfURI resolved {@code {+restconf}} resource name
+ * @param source Stream instance
+ * @param description Stream descriptiion
+ * @return A future {@link RestconfStream} instance
+ * @throws NullPointerException if any argument is {@code null}
+ */
+ <T> @NonNull RestconfFuture<RestconfStream<T>> createStream(URI restconfURI, Source<T> source,
+ String description);
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(RestconfStream.class);
private static final VarHandle SUBSCRIBERS;
}
}
};
- private final @NonNull ListenersBroker listenersBroker;
+ private final @NonNull AbstractRestconfStreamRegistry registry;
private final @NonNull Source<T> source;
private final @NonNull String name;
@GuardedBy("this")
private Registration registration;
- RestconfStream(final ListenersBroker listenersBroker, final Source<T> source, final String name) {
- this.listenersBroker = requireNonNull(listenersBroker);
+ RestconfStream(final AbstractRestconfStreamRegistry registry, final Source<T> source, final String name) {
+ this.registry = requireNonNull(registry);
this.source = requireNonNull(source);
this.name = requireNonNull(name);
}
}
/**
- * Registers {@link StreamSessionHandler} subscriber.
+ * Registers {@link Sender} subscriber.
*
* @param handler SSE or WS session handler.
* @param encoding Requested event stream encoding
* @throws NullPointerException if any argument is {@code null}
* @throws UnsupportedEncodingException if {@code encoding} is not supported
* @throws XPathExpressionException if requested filter is not valid
- * @throws InvalidArgumentException if the parameters are not supported
*/
- @Nullable Registration addSubscriber(final StreamSessionHandler handler, final EncodingName encoding,
+ public @Nullable Registration addSubscriber(final Sender handler, final EncodingName encoding,
final ReceiveEventsParams params) throws UnsupportedEncodingException, XPathExpressionException {
final var factory = source.encodings.get(requireNonNull(encoding));
if (factory == null) {
registration = null;
}
}
- listenersBroker.removeStream(this);
+ registry.removeStream(this);
}
@Override