Clean up YangInstanceIdentifierSerializer
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / WebSocketFactory.java
index 56869b0ebc0e7e880acd56f1094a4b89c3f9d06e..21a1bccad811502b9984b29dec82218c839da519 100644 (file)
@@ -9,12 +9,13 @@ package org.opendaylight.restconf.nb.rfc8040.streams;
 
 import static java.util.Objects.requireNonNull;
 
-import java.util.concurrent.ScheduledExecutorService;
 import javax.servlet.http.HttpServletResponse;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
+import org.opendaylight.restconf.nb.rfc8040.URLConstants;
+import org.opendaylight.restconf.server.spi.RestconfStream;
+import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,46 +28,67 @@ import org.slf4j.LoggerFactory;
  * @param heartbeatInterval     Interval in milliseconds between sending of ping control frames.
  */
 record WebSocketFactory(
-        ScheduledExecutorService executorService,
-        ListenersBroker listenersBroker,
+        RestconfStream.Registry streamRegistry,
+        PingExecutor pingExecutor,
         int maximumFragmentLength,
         int heartbeatInterval) implements WebSocketCreator {
     private static final Logger LOG = LoggerFactory.getLogger(WebSocketFactory.class);
+    private static final String STREAMS_PREFIX =
+        "/" + URLConstants.BASE_PATH + "/" + URLConstants.STREAMS_SUBPATH + "/";
 
     WebSocketFactory {
-        requireNonNull(executorService);
-        requireNonNull(listenersBroker);
+        requireNonNull(pingExecutor);
+        requireNonNull(streamRegistry);
     }
 
     /**
      * Creation of the new web-socket based on input HTTP/HTTPS upgrade request. Web-socket is created only if the
-     * data listener for input URI can be found (results in status code 101); otherwise status code 404 is set
-     * in upgrade response.
+     * data listener for input URI can be found (results in status code
+     * {@value HttpServletResponse#SC_SWITCHING_PROTOCOLS}); otherwise status code
+     * {@value HttpServletResponse#SC_NOT_FOUND} is set in upgrade response.
      *
-     * @param servletUpgradeRequest  Upgrade request.
-     * @param servletUpgradeResponse Upgrade response.
+     * @param req the request details
+     * @param resp the response details
      * @return Created web-socket instance or {@code null} if the web-socket cannot be created.
      */
     @Override
-    public Object createWebSocket(final ServletUpgradeRequest servletUpgradeRequest,
-            final ServletUpgradeResponse servletUpgradeResponse) {
-        final var streamName = ListenersBroker.createStreamNameFromUri(
-            servletUpgradeRequest.getRequestURI().getRawPath());
+    public Object createWebSocket(final ServletUpgradeRequest req, final ServletUpgradeResponse resp) {
+        final var path = req.getRequestURI().getPath();
+        if (!path.startsWith(STREAMS_PREFIX)) {
+            LOG.debug("Request path '{}' does not start with '{}'", path, STREAMS_PREFIX);
+            return notFound(resp);
+        }
 
-        final var listener = listenersBroker.listenerFor(streamName);
-        if (listener == null) {
+        final var stripped = path.substring(STREAMS_PREFIX.length());
+        final int slash = stripped.indexOf('/');
+        if (slash < 0) {
+            LOG.debug("Request path '{}' does not contain encoding", path);
+            return notFound(resp);
+        }
+        if (slash == 0) {
+            LOG.debug("Request path '{}' contains empty encoding", path);
+            return notFound(resp);
+        }
+        final var streamName = stripped.substring(slash + 1);
+        final var stream = streamRegistry.lookupStream(streamName);
+        if (stream == null) {
             LOG.debug("Listener for stream with name {} was not found.", streamName);
-            servletUpgradeResponse.setSuccess(false);
-            servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_NOT_FOUND);
-            return null;
+            return notFound(resp);
         }
 
         LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created",
             streamName);
-        servletUpgradeResponse.setSuccess(true);
-        servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
-        // note: every web-socket manages PING process individually because this approach scales better than sending
-        //       of PING frames at once over all web-socket sessions
-        return new WebSocketSessionHandler(executorService, listener, maximumFragmentLength, heartbeatInterval);
+        resp.setSuccess(true);
+        resp.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
+        // note: every web-socket manages PING process individually because this approach scales better than
+        //       sending PING frames at once over all web-socket sessions
+        return new WebSocketSender(pingExecutor, stream, new EncodingName(stripped.substring(0, slash)),
+            null, maximumFragmentLength, heartbeatInterval);
+    }
+
+    private static Object notFound(final ServletUpgradeResponse resp) {
+        resp.setSuccess(false);
+        resp.setStatusCode(HttpServletResponse.SC_NOT_FOUND);
+        return null;
     }
 }
\ No newline at end of file