Factor out WebSocketFactory 62/108762/1
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 30 Oct 2023 22:33:25 +0000 (23:33 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 30 Oct 2023 22:35:37 +0000 (23:35 +0100)
WebSocketFactory is a simple immutable thing. Factor it out of
WebSocketInitializer and turn it into a record.

Also share a single instance for all invocations of
WebSocketInitializer.configure() -- allowing us to ditch a number of
fields.

JIRA: NETCONF-1102
Change-Id: I83b2d74eab081d35367133519bb23df0276ed7a3
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java [new file with mode: 0644]
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketInitializer.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java

diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java
new file mode 100644 (file)
index 0000000..56869b0
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.nb.rfc8040.streams;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.ScheduledExecutorService;
+import javax.servlet.http.HttpServletResponse;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
+import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
+import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory that is used for creation of new web-sockets based on HTTP/HTTPS upgrade request.
+ *
+ * @param executorService       Executor for creation of threads for controlling of web-socket sessions.
+ * @param maximumFragmentLength Maximum web-socket fragment length in number of Unicode code units (characters)
+ *                              (exceeded message length leads to fragmentation of messages).
+ * @param heartbeatInterval     Interval in milliseconds between sending of ping control frames.
+ */
+record WebSocketFactory(
+        ScheduledExecutorService executorService,
+        ListenersBroker listenersBroker,
+        int maximumFragmentLength,
+        int heartbeatInterval) implements WebSocketCreator {
+    private static final Logger LOG = LoggerFactory.getLogger(WebSocketFactory.class);
+
+    WebSocketFactory {
+        requireNonNull(executorService);
+        requireNonNull(listenersBroker);
+    }
+
+    /**
+     * Creation of the new web-socket based on input HTTP/HTTPS upgrade request. Web-socket is created only if the
+     * data listener for input URI can be found (results in status code 101); otherwise status code 404 is set
+     * in upgrade response.
+     *
+     * @param servletUpgradeRequest  Upgrade request.
+     * @param servletUpgradeResponse Upgrade response.
+     * @return Created web-socket instance or {@code null} if the web-socket cannot be created.
+     */
+    @Override
+    public Object createWebSocket(final ServletUpgradeRequest servletUpgradeRequest,
+            final ServletUpgradeResponse servletUpgradeResponse) {
+        final var streamName = ListenersBroker.createStreamNameFromUri(
+            servletUpgradeRequest.getRequestURI().getRawPath());
+
+        final var listener = listenersBroker.listenerFor(streamName);
+        if (listener == null) {
+            LOG.debug("Listener for stream with name {} was not found.", streamName);
+            servletUpgradeResponse.setSuccess(false);
+            servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_NOT_FOUND);
+            return null;
+        }
+
+        LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created",
+            streamName);
+        servletUpgradeResponse.setSuccess(true);
+        servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
+        // note: every web-socket manages PING process individually because this approach scales better than sending
+        //       of PING frames at once over all web-socket sessions
+        return new WebSocketSessionHandler(executorService, listener, maximumFragmentLength, heartbeatInterval);
+    }
+}
\ No newline at end of file
index 206e57cc5f15601697c3f7e2f92bdc752b5b38c6..5e51c1916a91e5604d67eb2276b02d3cd00c7bc7 100644 (file)
@@ -7,26 +7,16 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams;
 
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.io.NotSerializableException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.concurrent.ScheduledExecutorService;
 import javax.inject.Inject;
 import javax.inject.Singleton;
-import javax.servlet.http.HttpServletResponse;
-import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
-import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
-import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Web-socket servlet listening on ws or wss schemas for created data-change-event or notification streams.
@@ -36,10 +26,7 @@ public final class WebSocketInitializer extends WebSocketServlet {
     @java.io.Serial
     private static final long serialVersionUID = 1L;
 
-    private final transient ScheduledExecutorService executorService;
-    private final transient ListenersBroker listenersBroker;
-    private final int maximumFragmentLength;
-    private final int heartbeatInterval;
+    private final transient WebSocketFactory creator;
     private final int idleTimeoutMillis;
 
     /**
@@ -51,10 +38,8 @@ public final class WebSocketInitializer extends WebSocketServlet {
     @Inject
     public WebSocketInitializer(final ScheduledThreadPool scheduledThreadPool,
             final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
-        executorService = scheduledThreadPool.getExecutor();
-        this.listenersBroker = requireNonNull(listenersBroker);
-        maximumFragmentLength = configuration.maximumFragmentLength();
-        heartbeatInterval = configuration.heartbeatInterval();
+        creator = new WebSocketFactory(scheduledThreadPool.getExecutor(), listenersBroker,
+            configuration.maximumFragmentLength(), configuration.heartbeatInterval());
         idleTimeoutMillis = configuration.idleTimeout();
     }
 
@@ -66,8 +51,7 @@ public final class WebSocketInitializer extends WebSocketServlet {
     @Override
     public void configure(final WebSocketServletFactory factory) {
         factory.getPolicy().setIdleTimeout(idleTimeoutMillis);
-        factory.setCreator(new WebSocketFactory(executorService, listenersBroker, maximumFragmentLength,
-            heartbeatInterval));
+        factory.setCreator(creator);
     }
 
     @java.io.Serial
@@ -85,65 +69,4 @@ public final class WebSocketInitializer extends WebSocketServlet {
     private static void throwNSE() throws NotSerializableException {
         throw new NotSerializableException(WebSocketInitializer.class.getName());
     }
-
-    /**
-     * Factory that is used for creation of new web-sockets based on HTTP/HTTPS upgrade request.
-     */
-    @VisibleForTesting
-    static final class WebSocketFactory implements WebSocketCreator {
-        private static final Logger LOG = LoggerFactory.getLogger(WebSocketFactory.class);
-
-        private final ScheduledExecutorService executorService;
-        private final ListenersBroker listenersBroker;
-        private final int maximumFragmentLength;
-        private final int heartbeatInterval;
-
-        /**
-         * Creation of the web-socket factory.
-         *
-         * @param executorService       Executor for creation of threads for controlling of web-socket sessions.
-         * @param maximumFragmentLength Maximum web-socket fragment length in number of Unicode code units (characters)
-         *                              (exceeded message length leads to fragmentation of messages).
-         * @param heartbeatInterval     Interval in milliseconds between sending of ping control frames.
-         */
-        WebSocketFactory(final ScheduledExecutorService executorService, final ListenersBroker listenersBroker,
-                final int maximumFragmentLength, final int heartbeatInterval) {
-            this.executorService = executorService;
-            this.listenersBroker = listenersBroker;
-            this.maximumFragmentLength = maximumFragmentLength;
-            this.heartbeatInterval = heartbeatInterval;
-        }
-
-        /**
-         * Creation of the new web-socket based on input HTTP/HTTPS upgrade request. Web-socket is created only if the
-         * data listener for input URI can be found (results in status code 101); otherwise status code 404 is set
-         * in upgrade response.
-         *
-         * @param servletUpgradeRequest  Upgrade request.
-         * @param servletUpgradeResponse Upgrade response.
-         * @return Created web-socket instance or {@code null} if the web-socket cannot be created.
-         */
-        @Override
-        public Object createWebSocket(final ServletUpgradeRequest servletUpgradeRequest,
-                final ServletUpgradeResponse servletUpgradeResponse) {
-            final var streamName = ListenersBroker.createStreamNameFromUri(
-                servletUpgradeRequest.getRequestURI().getRawPath());
-
-            final var listener = listenersBroker.listenerFor(streamName);
-            if (listener == null) {
-                LOG.debug("Listener for stream with name {} was not found.", streamName);
-                servletUpgradeResponse.setSuccess(false);
-                servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_NOT_FOUND);
-                return null;
-            }
-
-            LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created",
-                streamName);
-            servletUpgradeResponse.setSuccess(true);
-            servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
-            // note: every web-socket manages PING process individually because this approach scales better than
-            //       sending of PING frames at once over all web-socket sessions
-            return new WebSocketSessionHandler(executorService, listener, maximumFragmentLength, heartbeatInterval);
-        }
-    }
 }
index 368f01c69695063f9493b190028cdfd9e06541ea..2c514172c309ef4c2e43f183b142887119abf1da 100644 (file)
@@ -22,7 +22,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer.WebSocketFactory;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.AbstractNotificationListenerTest;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;