From 1d54816dbaf3cbc14c72ec80e643bf6fda1fc093 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 30 Oct 2023 23:33:25 +0100 Subject: [PATCH] Factor out WebSocketFactory WebSocketFactory is a simple immutable thing. Factor it out of WebSocketInitializer and turn it into a record. Also share a single instance for all invocations of WebSocketInitializer.configure() -- allowing us to ditch a number of fields. JIRA: NETCONF-1102 Change-Id: I83b2d74eab081d35367133519bb23df0276ed7a3 Signed-off-by: Robert Varga --- .../nb/rfc8040/streams/WebSocketFactory.java | 72 ++++++++++++++++ .../rfc8040/streams/WebSocketInitializer.java | 85 +------------------ .../rfc8040/streams/WebSocketFactoryTest.java | 1 - 3 files changed, 76 insertions(+), 82 deletions(-) create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java new file mode 100644 index 0000000000..56869b0ebc --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.nb.rfc8040.streams; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.ScheduledExecutorService; +import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory that is used for creation of new web-sockets based on HTTP/HTTPS upgrade request. + * + * @param executorService Executor for creation of threads for controlling of web-socket sessions. + * @param maximumFragmentLength Maximum web-socket fragment length in number of Unicode code units (characters) + * (exceeded message length leads to fragmentation of messages). + * @param heartbeatInterval Interval in milliseconds between sending of ping control frames. + */ +record WebSocketFactory( + ScheduledExecutorService executorService, + ListenersBroker listenersBroker, + int maximumFragmentLength, + int heartbeatInterval) implements WebSocketCreator { + private static final Logger LOG = LoggerFactory.getLogger(WebSocketFactory.class); + + WebSocketFactory { + requireNonNull(executorService); + requireNonNull(listenersBroker); + } + + /** + * Creation of the new web-socket based on input HTTP/HTTPS upgrade request. Web-socket is created only if the + * data listener for input URI can be found (results in status code 101); otherwise status code 404 is set + * in upgrade response. + * + * @param servletUpgradeRequest Upgrade request. + * @param servletUpgradeResponse Upgrade response. + * @return Created web-socket instance or {@code null} if the web-socket cannot be created. + */ + @Override + public Object createWebSocket(final ServletUpgradeRequest servletUpgradeRequest, + final ServletUpgradeResponse servletUpgradeResponse) { + final var streamName = ListenersBroker.createStreamNameFromUri( + servletUpgradeRequest.getRequestURI().getRawPath()); + + final var listener = listenersBroker.listenerFor(streamName); + if (listener == null) { + LOG.debug("Listener for stream with name {} was not found.", streamName); + servletUpgradeResponse.setSuccess(false); + servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_NOT_FOUND); + return null; + } + + LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created", + streamName); + servletUpgradeResponse.setSuccess(true); + servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS); + // note: every web-socket manages PING process individually because this approach scales better than sending + // of PING frames at once over all web-socket sessions + return new WebSocketSessionHandler(executorService, listener, maximumFragmentLength, heartbeatInterval); + } +} \ No newline at end of file diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketInitializer.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketInitializer.java index 206e57cc5f..5e51c1916a 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketInitializer.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketInitializer.java @@ -7,26 +7,16 @@ */ package org.opendaylight.restconf.nb.rfc8040.streams; -import static java.util.Objects.requireNonNull; - -import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.NotSerializableException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.util.concurrent.ScheduledExecutorService; import javax.inject.Inject; import javax.inject.Singleton; -import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; -import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; -import org.eclipse.jetty.websocket.servlet.WebSocketCreator; import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.opendaylight.controller.config.threadpool.ScheduledThreadPool; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Web-socket servlet listening on ws or wss schemas for created data-change-event or notification streams. @@ -36,10 +26,7 @@ public final class WebSocketInitializer extends WebSocketServlet { @java.io.Serial private static final long serialVersionUID = 1L; - private final transient ScheduledExecutorService executorService; - private final transient ListenersBroker listenersBroker; - private final int maximumFragmentLength; - private final int heartbeatInterval; + private final transient WebSocketFactory creator; private final int idleTimeoutMillis; /** @@ -51,10 +38,8 @@ public final class WebSocketInitializer extends WebSocketServlet { @Inject public WebSocketInitializer(final ScheduledThreadPool scheduledThreadPool, final ListenersBroker listenersBroker, final StreamsConfiguration configuration) { - executorService = scheduledThreadPool.getExecutor(); - this.listenersBroker = requireNonNull(listenersBroker); - maximumFragmentLength = configuration.maximumFragmentLength(); - heartbeatInterval = configuration.heartbeatInterval(); + creator = new WebSocketFactory(scheduledThreadPool.getExecutor(), listenersBroker, + configuration.maximumFragmentLength(), configuration.heartbeatInterval()); idleTimeoutMillis = configuration.idleTimeout(); } @@ -66,8 +51,7 @@ public final class WebSocketInitializer extends WebSocketServlet { @Override public void configure(final WebSocketServletFactory factory) { factory.getPolicy().setIdleTimeout(idleTimeoutMillis); - factory.setCreator(new WebSocketFactory(executorService, listenersBroker, maximumFragmentLength, - heartbeatInterval)); + factory.setCreator(creator); } @java.io.Serial @@ -85,65 +69,4 @@ public final class WebSocketInitializer extends WebSocketServlet { private static void throwNSE() throws NotSerializableException { throw new NotSerializableException(WebSocketInitializer.class.getName()); } - - /** - * Factory that is used for creation of new web-sockets based on HTTP/HTTPS upgrade request. - */ - @VisibleForTesting - static final class WebSocketFactory implements WebSocketCreator { - private static final Logger LOG = LoggerFactory.getLogger(WebSocketFactory.class); - - private final ScheduledExecutorService executorService; - private final ListenersBroker listenersBroker; - private final int maximumFragmentLength; - private final int heartbeatInterval; - - /** - * Creation of the web-socket factory. - * - * @param executorService Executor for creation of threads for controlling of web-socket sessions. - * @param maximumFragmentLength Maximum web-socket fragment length in number of Unicode code units (characters) - * (exceeded message length leads to fragmentation of messages). - * @param heartbeatInterval Interval in milliseconds between sending of ping control frames. - */ - WebSocketFactory(final ScheduledExecutorService executorService, final ListenersBroker listenersBroker, - final int maximumFragmentLength, final int heartbeatInterval) { - this.executorService = executorService; - this.listenersBroker = listenersBroker; - this.maximumFragmentLength = maximumFragmentLength; - this.heartbeatInterval = heartbeatInterval; - } - - /** - * Creation of the new web-socket based on input HTTP/HTTPS upgrade request. Web-socket is created only if the - * data listener for input URI can be found (results in status code 101); otherwise status code 404 is set - * in upgrade response. - * - * @param servletUpgradeRequest Upgrade request. - * @param servletUpgradeResponse Upgrade response. - * @return Created web-socket instance or {@code null} if the web-socket cannot be created. - */ - @Override - public Object createWebSocket(final ServletUpgradeRequest servletUpgradeRequest, - final ServletUpgradeResponse servletUpgradeResponse) { - final var streamName = ListenersBroker.createStreamNameFromUri( - servletUpgradeRequest.getRequestURI().getRawPath()); - - final var listener = listenersBroker.listenerFor(streamName); - if (listener == null) { - LOG.debug("Listener for stream with name {} was not found.", streamName); - servletUpgradeResponse.setSuccess(false); - servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_NOT_FOUND); - return null; - } - - LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created", - streamName); - servletUpgradeResponse.setSuccess(true); - servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS); - // note: every web-socket manages PING process individually because this approach scales better than - // sending of PING frames at once over all web-socket sessions - return new WebSocketSessionHandler(executorService, listener, maximumFragmentLength, heartbeatInterval); - } - } } diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java index 368f01c696..2c514172c3 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java @@ -22,7 +22,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer.WebSocketFactory; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.AbstractNotificationListenerTest; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope; -- 2.36.6