Introduce restconf.server.{api,spi,mdsal}
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / WebSocketFactory.java
index dc8bf337edef067bd81072489e5f7dee99024eaa..21a1bccad811502b9984b29dec82218c839da519 100644 (file)
@@ -9,12 +9,13 @@ package org.opendaylight.restconf.nb.rfc8040.streams;
 
 import static java.util.Objects.requireNonNull;
 
-import java.util.concurrent.ScheduledExecutorService;
 import javax.servlet.http.HttpServletResponse;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
 import org.opendaylight.restconf.nb.rfc8040.URLConstants;
+import org.opendaylight.restconf.server.spi.RestconfStream;
+import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,8 +28,8 @@ import org.slf4j.LoggerFactory;
  * @param heartbeatInterval     Interval in milliseconds between sending of ping control frames.
  */
 record WebSocketFactory(
-        ScheduledExecutorService executorService,
-        ListenersBroker listenersBroker,
+        RestconfStream.Registry streamRegistry,
+        PingExecutor pingExecutor,
         int maximumFragmentLength,
         int heartbeatInterval) implements WebSocketCreator {
     private static final Logger LOG = LoggerFactory.getLogger(WebSocketFactory.class);
@@ -36,8 +37,8 @@ record WebSocketFactory(
         "/" + URLConstants.BASE_PATH + "/" + URLConstants.STREAMS_SUBPATH + "/";
 
     WebSocketFactory {
-        requireNonNull(executorService);
-        requireNonNull(listenersBroker);
+        requireNonNull(pingExecutor);
+        requireNonNull(streamRegistry);
     }
 
     /**
@@ -53,23 +54,39 @@ record WebSocketFactory(
     @Override
     public Object createWebSocket(final ServletUpgradeRequest req, final ServletUpgradeResponse resp) {
         final var path = req.getRequestURI().getPath();
-        if (path.startsWith(STREAMS_PREFIX)) {
-            final var streamName = path.substring(STREAMS_PREFIX.length());
-            final var listener = listenersBroker.listenerFor(streamName);
-            if (listener != null) {
-                LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created",
-                    streamName);
-                resp.setSuccess(true);
-                resp.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
-                // note: every web-socket manages PING process individually because this approach scales better than
-                //       sending PING frames at once over all web-socket sessions
-                return new WebSocketSessionHandler(executorService, listener, maximumFragmentLength, heartbeatInterval);
-            }
+        if (!path.startsWith(STREAMS_PREFIX)) {
+            LOG.debug("Request path '{}' does not start with '{}'", path, STREAMS_PREFIX);
+            return notFound(resp);
+        }
 
+        final var stripped = path.substring(STREAMS_PREFIX.length());
+        final int slash = stripped.indexOf('/');
+        if (slash < 0) {
+            LOG.debug("Request path '{}' does not contain encoding", path);
+            return notFound(resp);
+        }
+        if (slash == 0) {
+            LOG.debug("Request path '{}' contains empty encoding", path);
+            return notFound(resp);
+        }
+        final var streamName = stripped.substring(slash + 1);
+        final var stream = streamRegistry.lookupStream(streamName);
+        if (stream == null) {
             LOG.debug("Listener for stream with name {} was not found.", streamName);
-        } else {
-            LOG.debug("Request path '{}' does not start with '{}'", path, STREAMS_PREFIX);
+            return notFound(resp);
         }
+
+        LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created",
+            streamName);
+        resp.setSuccess(true);
+        resp.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
+        // note: every web-socket manages PING process individually because this approach scales better than
+        //       sending PING frames at once over all web-socket sessions
+        return new WebSocketSender(pingExecutor, stream, new EncodingName(stripped.substring(0, slash)),
+            null, maximumFragmentLength, heartbeatInterval);
+    }
+
+    private static Object notFound(final ServletUpgradeResponse resp) {
         resp.setSuccess(false);
         resp.setStatusCode(HttpServletResponse.SC_NOT_FOUND);
         return null;