Introduce restconf.server.{api,spi,mdsal}
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / server / spi / RestconfStream.java
similarity index 83%
rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStream.java
rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/RestconfStream.java
index c13317e15d3ee42816e19dbc4241d7bf542da070..fc01ab7880755cef6f5e3246db06942966bfe1a6 100644 (file)
@@ -5,7 +5,7 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.restconf.nb.rfc8040.streams;
+package org.opendaylight.restconf.server.spi;
 
 import static java.util.Objects.requireNonNull;
 
@@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap;
 import java.io.UnsupportedEncodingException;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
+import java.net.URI;
 import java.time.Instant;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -22,6 +23,7 @@ import javax.xml.xpath.XPathExpressionException;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.restconf.common.errors.RestconfFuture;
 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access;
 import org.opendaylight.yangtools.concepts.Registration;
@@ -107,6 +109,51 @@ public final class RestconfStream<T> {
         }
     }
 
+    /**
+     * Interface for session handler that is responsible for sending of data over established session.
+     */
+    public interface Sender {
+        /**
+         * Interface for sending String message through one of implementation.
+         *
+         * @param data Message data to be send.
+         */
+        void sendDataMessage(String data);
+
+        /**
+         * Called when the stream has reached its end. The handler should close all underlying resources.
+         */
+        void endOfStream();
+    }
+
+    /**
+     * An entity managing allocation and lookup of {@link RestconfStream}s.
+     */
+    public interface Registry {
+        /**
+         * Get a {@link RestconfStream} by its name.
+         *
+         * @param name Stream name.
+         * @return A {@link RestconfStream}, or {@code null} if the stream with specified name does not exist.
+         * @throws NullPointerException if {@code name} is {@code null}
+         */
+        @Nullable RestconfStream<?> lookupStream(String name);
+
+        /**
+         * Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name,
+         * create the corresponding instance and register it.
+         *
+         * @param <T> Stream type
+         * @param restconfURI resolved {@code {+restconf}} resource name
+         * @param source Stream instance
+         * @param description Stream descriptiion
+         * @return A future {@link RestconfStream} instance
+         * @throws NullPointerException if any argument is {@code null}
+         */
+        <T> @NonNull RestconfFuture<RestconfStream<T>> createStream(URI restconfURI, Source<T> source,
+            String description);
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(RestconfStream.class);
     private static final VarHandle SUBSCRIBERS;
 
@@ -139,7 +186,7 @@ public final class RestconfStream<T> {
             }
         }
     };
-    private final @NonNull ListenersBroker listenersBroker;
+    private final @NonNull AbstractRestconfStreamRegistry registry;
     private final @NonNull Source<T> source;
     private final @NonNull String name;
 
@@ -150,8 +197,8 @@ public final class RestconfStream<T> {
     @GuardedBy("this")
     private Registration registration;
 
-    RestconfStream(final ListenersBroker listenersBroker, final Source<T> source, final String name) {
-        this.listenersBroker = requireNonNull(listenersBroker);
+    RestconfStream(final AbstractRestconfStreamRegistry registry, final Source<T> source, final String name) {
+        this.registry = requireNonNull(registry);
         this.source = requireNonNull(source);
         this.name = requireNonNull(name);
     }
@@ -177,7 +224,7 @@ public final class RestconfStream<T> {
     }
 
     /**
-     * Registers {@link StreamSessionHandler} subscriber.
+     * Registers {@link Sender} subscriber.
      *
      * @param handler SSE or WS session handler.
      * @param encoding Requested event stream encoding
@@ -186,9 +233,8 @@ public final class RestconfStream<T> {
      * @throws NullPointerException if any argument is {@code null}
      * @throws UnsupportedEncodingException if {@code encoding} is not supported
      * @throws XPathExpressionException if requested filter is not valid
-     * @throws InvalidArgumentException if the parameters are not supported
      */
-    @Nullable Registration addSubscriber(final StreamSessionHandler handler, final EncodingName encoding,
+    public @Nullable Registration addSubscriber(final Sender handler, final EncodingName encoding,
             final ReceiveEventsParams params) throws UnsupportedEncodingException, XPathExpressionException {
         final var factory = source.encodings.get(requireNonNull(encoding));
         if (factory == null) {
@@ -292,7 +338,7 @@ public final class RestconfStream<T> {
                 registration = null;
             }
         }
-        listenersBroker.removeStream(this);
+        registry.removeStream(this);
     }
 
     @Override