Disconnect HTTPServer from RequestDispatcher 36/113436/18
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 4 Sep 2024 23:52:57 +0000 (01:52 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 10 Sep 2024 06:47:47 +0000 (08:47 +0200)
RequestDispatcher is useless layering violation sitting between Netty
pipeline and RESTCONF protocol implementation.

Do not use RequestDispatcher in HTTPServer and instead use
onTransportChannelEstablished() to wire a RestconfSession into the
pipeline.

This means we have centralized protocol-level initialization at the
correct layer and a proper class interfacing between a particular Netty
channel and the RestconfServer.

RestconfRequestDispatcher now becomes a private implementation detail,
acting as a the common router between RestconfSession and
RestconfServer (et al.).

A follow-up patch will deal with exposing RestconfSession from
NettyServerRequest.

JIRA: NETCONF-1379
Change-Id: Ie24e440dbf24a850428902076175fb569981d0d9
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
13 files changed:
protocol/restconf-server/pom.xml
protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/NettyEndpoint.java
protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/NettyServerRequest.java
protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/RestconfRequestDispatcher.java
protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/RestconfSession.java [new file with mode: 0644]
protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/RestconfTransportChannelListener.java
protocol/restconf-server/src/test/java/org/opendaylight/restconf/server/AbstractRequestProcessorTest.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/HTTPServer.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/PlainHTTPServer.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/RequestDispatcher.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/TlsHTTPServer.java
transport/transport-http/src/test/java/org/opendaylight/netconf/transport/http/HttpClientServerTest.java
transport/transport-http/src/test/java/org/opendaylight/netconf/transport/http/SseClientServerTest.java

index b2340c5ee55f59be1e1d104169d5b9758d4f1a60..9685ba728772dfd62f77e3c85a3864332c2e0651 100644 (file)
             <groupId>io.netty</groupId>
             <artifactId>netty-codec-http</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-http2</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-common</artifactId>
index 1e050ee6f130283ec31449882d4d87f26fb4be29..062381608f744bdba649db7f870286854081d14d 100644 (file)
@@ -37,14 +37,13 @@ public final class NettyEndpoint {
 
     public NettyEndpoint(final RestconfServer server, final PrincipalService principalService,
             final RestconfStream.Registry streamRegistry, final NettyEndpointConfiguration configuration) {
-        final var listener = new RestconfTransportChannelListener(streamRegistry, configuration);
-        final var dispatcher = new RestconfRequestDispatcher(server, principalService, configuration.baseUri(),
-            configuration.errorTagMapping(), configuration.defaultAcceptType(), configuration.prettyPrint());
+        final var listener = new RestconfTransportChannelListener(server, streamRegistry, principalService,
+            configuration);
 
         final var bootstrapFactory = new BootstrapFactory(configuration.groupName(), configuration.groupThreads());
         try {
             httpServer = HTTPServer.listen(listener, bootstrapFactory.newServerBootstrap(),
-                configuration.transportConfiguration(), dispatcher, principalService).get();
+                configuration.transportConfiguration(), principalService).get();
         } catch (UnsupportedConfigurationException | ExecutionException | InterruptedException e) {
             throw new IllegalStateException("Could not start RESTCONF server", e);
         }
index 24ca827e31e6c23eefd7aae329d3c7dba0b103d8..2f9fbe51cfb6c22312f57d505ca6cfb3387e96c6 100644 (file)
@@ -56,6 +56,7 @@ final class NettyServerRequest<T> extends MappingServerRequest<T> {
 
     @Override
     public @Nullable TransportSession session() {
+        // FIXME: return the correct NettyTransportSession
         return null;
     }
 }
index 6ae8f97497186ffa6ddbfdd788dff36e450c093b..9f856cb7128712443908056a39240e594a2da9eb 100644 (file)
@@ -17,7 +17,6 @@ import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.util.AsciiString;
 import java.net.URI;
-import org.opendaylight.netconf.transport.http.RequestDispatcher;
 import org.opendaylight.restconf.api.query.PrettyPrintParam;
 import org.opendaylight.restconf.server.api.RestconfServer;
 import org.opendaylight.restconf.server.spi.ErrorTagMapping;
@@ -25,38 +24,37 @@ import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class RestconfRequestDispatcher implements RequestDispatcher {
+final class RestconfRequestDispatcher {
     private static final Logger LOG = LoggerFactory.getLogger(RestconfRequestDispatcher.class);
 
-    private final URI baseURI;
+    private final URI baseUri;
     private final RestconfServer restconfService;
     private final PrincipalService principalService;
     private final ErrorTagMapping errorTagMapping;
     private final AsciiString defaultAcceptType;
     private final PrettyPrintParam defaultPrettyPrint;
 
-    public RestconfRequestDispatcher(final RestconfServer restconfService, final PrincipalService principalService,
+    RestconfRequestDispatcher(final RestconfServer restconfService, final PrincipalService principalService,
             final URI baseUri, final ErrorTagMapping errorTagMapping,
             final AsciiString defaultAcceptType, final PrettyPrintParam defaultPrettyPrint) {
         this.restconfService = requireNonNull(restconfService);
         this.principalService = requireNonNull(principalService);
-        this.baseURI = requireNonNull(baseUri);
+        this.baseUri = requireNonNull(baseUri);
         this.errorTagMapping = requireNonNull(errorTagMapping);
         this.defaultAcceptType = requireNonNull(defaultAcceptType);
         this.defaultPrettyPrint = requireNonNull(defaultPrettyPrint);
 
         LOG.info("{} initialized with service {}", getClass().getSimpleName(), restconfService.getClass());
         LOG.info("Base path: {}, default accept: {}, default pretty print: {}",
-            this.baseURI, defaultAcceptType, defaultPrettyPrint.value());
+            baseUri, defaultAcceptType, defaultPrettyPrint.value());
     }
 
-    @Override
     @SuppressWarnings("IllegalCatch")
-    public void dispatch(final FullHttpRequest request, final FutureCallback<FullHttpResponse> callback) {
+    void dispatch(final FullHttpRequest request, final FutureCallback<FullHttpResponse> callback) {
         LOG.debug("Dispatching {} {}", request.method(), request.uri());
 
         final var principal = principalService.acquirePrincipal(request);
-        final var params = new RequestParameters(baseURI, request, principal,
+        final var params = new RequestParameters(baseUri, request, principal,
             errorTagMapping, defaultAcceptType, defaultPrettyPrint);
         try {
             switch (params.pathParameters().apiResource()) {
diff --git a/protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/RestconfSession.java b/protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/RestconfSession.java
new file mode 100644 (file)
index 0000000..e55fad6
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.server;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.FutureCallback;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http2.HttpConversionUtil.ExtensionHeaderNames;
+import io.netty.util.AsciiString;
+import org.opendaylight.restconf.server.api.TransportSession;
+
+/**
+ * A RESTCONF session, as defined in <a href="https://www.rfc-editor.org/rfc/rfc8650#section-3.1">RFC8650</a>. It acts
+ * as glue between a Netty channel and a RESTCONF server and may be servicing one (HTTP/1.1) or more (HTTP/2) logical
+ * connections.
+ */
+final class RestconfSession extends SimpleChannelInboundHandler<FullHttpRequest> implements TransportSession {
+    private static final AsciiString STREAM_ID = ExtensionHeaderNames.STREAM_ID.text();
+
+    private final RestconfRequestDispatcher dispatcher;
+
+    RestconfSession(final RestconfRequestDispatcher dispatcher) {
+        this.dispatcher = requireNonNull(dispatcher);
+    }
+
+    @Override
+    protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest msg) {
+        dispatcher.dispatch(msg.retain(), new FutureCallback<>() {
+            @Override
+            public void onSuccess(final FullHttpResponse response) {
+                final var streamId = msg.headers().getInt(STREAM_ID);
+                if (streamId != null) {
+                    response.headers().setInt(STREAM_ID, streamId);
+                }
+                msg.release();
+                ctx.writeAndFlush(response);
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                final var message = throwable.getMessage();
+                final var content = message == null ? Unpooled.EMPTY_BUFFER
+                    : ByteBufUtil.writeUtf8(ctx.alloc(), message);
+                final var response = new DefaultFullHttpResponse(msg.protocolVersion(),
+                    HttpResponseStatus.INTERNAL_SERVER_ERROR, content);
+                response.headers()
+                    .set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN)
+                    .setInt(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
+                onSuccess(response);
+            }
+        });
+    }
+}
index 181285b1c6c6515be4139a133dab37506128838f..f3b3b843165ac60a4e4f106b34898a85ac52cfe9 100644 (file)
@@ -11,6 +11,7 @@ import static java.util.Objects.requireNonNull;
 
 import org.opendaylight.netconf.transport.api.TransportChannel;
 import org.opendaylight.netconf.transport.api.TransportChannelListener;
+import org.opendaylight.netconf.transport.http.HTTPServer;
 import org.opendaylight.netconf.transport.http.SseUtils;
 import org.opendaylight.restconf.server.api.RestconfServer;
 import org.opendaylight.restconf.server.spi.RestconfStream;
@@ -25,16 +26,23 @@ final class RestconfTransportChannelListener implements TransportChannelListener
 
     private final RestconfStream.Registry streamRegistry;
     private final NettyEndpointConfiguration configuration;
+    private final RestconfRequestDispatcher dispatcher;
 
-    RestconfTransportChannelListener(final RestconfStream.Registry streamRegistry,
-            final NettyEndpointConfiguration configuration) {
+    RestconfTransportChannelListener(final RestconfServer server, final RestconfStream.Registry streamRegistry,
+            final PrincipalService principalService, final NettyEndpointConfiguration configuration) {
         this.streamRegistry = requireNonNull(streamRegistry);
         this.configuration = requireNonNull(configuration);
+        dispatcher = new RestconfRequestDispatcher(server, principalService, configuration.baseUri(),
+            configuration.errorTagMapping(), configuration.defaultAcceptType(), configuration.prettyPrint());
     }
 
     @Override
     public void onTransportChannelEstablished(final TransportChannel channel) {
-        SseUtils.enableServerSse(channel.channel(),
+        final var nettyChannel = channel.channel();
+
+        nettyChannel.pipeline().addLast(HTTPServer.REQUEST_DISPATCHER_HANDLER_NAME, new RestconfSession(dispatcher));
+
+        SseUtils.enableServerSse(nettyChannel,
             new RestconfStreamService(streamRegistry, configuration.baseUri(), configuration.errorTagMapping(),
                 configuration.defaultAcceptType(), configuration.prettyPrint()),
             configuration.sseMaximumFragmentLength().toJava(), configuration.sseHeartbeatIntervalMillis().toJava());
index 6ac0a15b1761bda0598379c831a3bc095f390b6e..d59f9ca47efb6150c31d8e5203be6f3506eaf0db 100644 (file)
@@ -25,7 +25,6 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.opendaylight.netconf.transport.http.RequestDispatcher;
 import org.opendaylight.restconf.api.ApiPath;
 import org.opendaylight.restconf.api.query.PrettyPrintParam;
 import org.opendaylight.restconf.server.TestUtils.TestEncoding;
@@ -57,7 +56,7 @@ public class AbstractRequestProcessorTest {
     @Captor
     private ArgumentCaptor<FullHttpResponse> responseCaptor;
 
-    private RequestDispatcher dispatcher;
+    private RestconfRequestDispatcher dispatcher;
 
     @BeforeEach
     void beforeEach() {
index 2084a0b98b6df8d7829ec91d26e7ce280e87b26c..c2631240afef7403fe4006ae56f24797b4a844ff 100644 (file)
@@ -7,26 +7,12 @@
  */
 package org.opendaylight.netconf.transport.http;
 
-import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
-import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
-import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
-import static io.netty.handler.codec.http.HttpHeaderValues.TEXT_PLAIN;
-import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
 import static java.util.Objects.requireNonNull;
-import static org.opendaylight.netconf.transport.http.Http2Utils.copyStreamId;
 
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http2.Http2ConnectionHandler;
-import java.nio.charset.StandardCharsets;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.netconf.transport.api.TransportChannel;
@@ -42,15 +28,12 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.http.server
  * A {@link HTTPTransportStack} acting as a server.
  */
 public abstract sealed class HTTPServer extends HTTPTransportStack permits PlainHTTPServer, TlsHTTPServer {
-    static final String REQUEST_DISPATCHER_HANDLER_NAME = "request-dispatcher";
+    public static final String REQUEST_DISPATCHER_HANDLER_NAME = "request-dispatcher";
 
     private final AuthHandlerFactory authHandlerFactory;
-    private final @NonNull RequestDispatcher dispatcher;
 
-    HTTPServer(final TransportChannelListener listener, final RequestDispatcher dispatcher,
-            final AuthHandlerFactory authHandlerFactory) {
+    HTTPServer(final TransportChannelListener listener, final AuthHandlerFactory authHandlerFactory) {
         super(listener);
-        this.dispatcher = requireNonNull(dispatcher);
         this.authHandlerFactory = authHandlerFactory;
     }
 
@@ -60,15 +43,14 @@ public abstract sealed class HTTPServer extends HTTPTransportStack permits Plain
      * @param listener {@link TransportChannelListener} to notify when the session is established
      * @param bootstrap {@link ServerBootstrap} to use for the underlying Netty server channel
      * @param listenParams Listening parameters
-     * @param dispatcher server logic layer implementation as {@link RequestDispatcher}
      * @return A future
      * @throws UnsupportedConfigurationException when {@code listenParams} contains an unsupported options
      * @throws NullPointerException if any argument is {@code null}
      */
     public static final @NonNull ListenableFuture<HTTPServer> listen(final TransportChannelListener listener,
-            final ServerBootstrap bootstrap, final HttpServerStackGrouping listenParams,
-            final RequestDispatcher dispatcher) throws UnsupportedConfigurationException {
-        return listen(listener, bootstrap, listenParams, dispatcher, null);
+            final ServerBootstrap bootstrap, final HttpServerStackGrouping listenParams)
+                throws UnsupportedConfigurationException {
+        return listen(listener, bootstrap, listenParams, null);
     }
 
     /**
@@ -77,7 +59,6 @@ public abstract sealed class HTTPServer extends HTTPTransportStack permits Plain
      * @param listener {@link TransportChannelListener} to notify when the session is established
      * @param bootstrap {@link ServerBootstrap} to use for the underlying Netty server channel
      * @param listenParams Listening parameters
-     * @param dispatcher server logic layer implementation as {@link RequestDispatcher}
      * @param authHandlerFactory {@link AuthHandlerFactory} instance, provides channel handler serving the request
      *      authentication; optional, if defined the Basic Auth settings of listenParams will be ignored
      * @return A future
@@ -86,31 +67,30 @@ public abstract sealed class HTTPServer extends HTTPTransportStack permits Plain
      */
     public static final @NonNull ListenableFuture<HTTPServer> listen(final TransportChannelListener listener,
             final ServerBootstrap bootstrap, final HttpServerStackGrouping listenParams,
-            final RequestDispatcher dispatcher, final @Nullable AuthHandlerFactory authHandlerFactory)
-            throws UnsupportedConfigurationException {
+            final @Nullable AuthHandlerFactory authHandlerFactory) throws UnsupportedConfigurationException {
         final var transport = requireNonNull(listenParams).getTransport();
         return switch (transport) {
-            case Tcp tcpCase -> listen(listener, bootstrap, tcpCase, dispatcher, authHandlerFactory);
-            case Tls tlsCase -> listen(listener, bootstrap, tlsCase, dispatcher, authHandlerFactory);
+            case Tcp tcpCase -> listen(listener, bootstrap, tcpCase, authHandlerFactory);
+            case Tls tlsCase -> listen(listener, bootstrap, tlsCase, authHandlerFactory);
             default -> throw new UnsupportedConfigurationException("Unsupported transport: " + transport);
         };
     }
 
     private static @NonNull ListenableFuture<HTTPServer> listen(final TransportChannelListener listener,
-            final ServerBootstrap bootstrap, final Tcp tcpCase, final RequestDispatcher dispatcher,
-            final @Nullable AuthHandlerFactory authHandlerFactory) throws UnsupportedConfigurationException {
+            final ServerBootstrap bootstrap, final Tcp tcpCase, final @Nullable AuthHandlerFactory authHandlerFactory)
+                throws UnsupportedConfigurationException {
         final var tcp = tcpCase.getTcp();
-        final var server = new PlainHTTPServer(listener, dispatcher, authHandlerFactory != null ? authHandlerFactory
+        final var server = new PlainHTTPServer(listener, authHandlerFactory != null ? authHandlerFactory
             : BasicAuthHandlerFactory.ofNullable(tcp.getHttpServerParameters()));
         return transformUnderlay(server,
             TCPServer.listen(server.asListener(), bootstrap, tcp.nonnullTcpServerParameters()));
     }
 
     private static @NonNull ListenableFuture<HTTPServer> listen(final TransportChannelListener listener,
-            final ServerBootstrap bootstrap, final Tls tlsCase, final RequestDispatcher dispatcher,
-            final @Nullable AuthHandlerFactory authHandlerFactory) throws UnsupportedConfigurationException {
+            final ServerBootstrap bootstrap, final Tls tlsCase, final @Nullable AuthHandlerFactory authHandlerFactory)
+                throws UnsupportedConfigurationException {
         final var tls = tlsCase.getTls();
-        final var server = new TlsHTTPServer(listener, dispatcher, authHandlerFactory != null ? authHandlerFactory
+        final var server = new TlsHTTPServer(listener, authHandlerFactory != null ? authHandlerFactory
             : BasicAuthHandlerFactory.ofNullable(tls.getHttpServerParameters()));
         return transformUnderlay(server,
             TLSServer.listen(server.asListener(), bootstrap, tls.nonnullTcpServerParameters(),
@@ -128,33 +108,6 @@ public abstract sealed class HTTPServer extends HTTPTransportStack permits Plain
             pipeline.addLast(authHandlerFactory.create());
         }
 
-        pipeline.addLast(REQUEST_DISPATCHER_HANDLER_NAME, new SimpleChannelInboundHandler<FullHttpRequest>() {
-            @Override
-            protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest request) {
-                dispatcher.dispatch(request.retain(), new FutureCallback<>() {
-                    @Override
-                    public void onSuccess(final FullHttpResponse response) {
-                        copyStreamId(request, response);
-                        request.release();
-                        ctx.writeAndFlush(response);
-                    }
-
-                    @Override
-                    public void onFailure(final Throwable throwable) {
-                        final var message = throwable.getMessage();
-                        final var content = message == null ? EMPTY_BUFFER
-                            : Unpooled.wrappedBuffer(message.getBytes(StandardCharsets.UTF_8));
-                        final var response = new DefaultFullHttpResponse(request.protocolVersion(),
-                            INTERNAL_SERVER_ERROR, content);
-                        response.headers()
-                            .set(CONTENT_TYPE, TEXT_PLAIN)
-                            .setInt(CONTENT_LENGTH, response.content().readableBytes());
-                        onSuccess(response);
-                    }
-                });
-            }
-        });
-
         addTransportChannel(new HTTPTransportChannel(underlayChannel));
     }
 
index 88839ed522db99b12a27ab91c5c0ee75e8781726..8cb3c0dc60c555b37e581818fea11fc81850376d 100644 (file)
@@ -27,9 +27,8 @@ import org.opendaylight.netconf.transport.api.TransportChannelListener;
  * An {@link HTTPServer} operating over plain TCP.
  */
 final class PlainHTTPServer extends HTTPServer {
-    PlainHTTPServer(final TransportChannelListener listener, final RequestDispatcher dispatcher,
-            final AuthHandlerFactory authHandlerFactory) {
-        super(listener, dispatcher, authHandlerFactory);
+    PlainHTTPServer(final TransportChannelListener listener, final AuthHandlerFactory authHandlerFactory) {
+        super(listener, authHandlerFactory);
     }
 
     @Override
index 918f2b3f6fa3bd90b63d9a8867d0d133cedf3907..88552a259e02da422b40d9da064afde167cf7b2c 100644 (file)
@@ -14,9 +14,14 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
 
 /**
  * Functional interface for HTTP request dispatcher.
+ *
+ * @deprecated This interface is an Operations layer concern (i.e. request/response semantics). As such it is a clear
+ *             layering violation as {@code org.opendaylight.netconf.transport} deals only with the Secure Transport
+ *             layer.
  */
 @NonNullByDefault
 @FunctionalInterface
+@Deprecated(forRemoval = true, since = "8.0.2")
 public interface RequestDispatcher {
     /**
      * Performs {@link FullHttpRequest} processing. Any error occurred is expected either to be returned within
index b0613d71943c9b6b6380e72c41a7a235a5e1a406..b21ac94b16d930c9022d1d43c489eec4c73257e9 100644 (file)
@@ -21,9 +21,8 @@ import org.opendaylight.netconf.transport.api.TransportChannelListener;
  * An {@link HTTPServer} operating over TLS.
  */
 final class TlsHTTPServer extends HTTPServer {
-    TlsHTTPServer(final TransportChannelListener listener, final RequestDispatcher dispatcher,
-            final AuthHandlerFactory authHandlerFactory) {
-        super(listener, dispatcher, authHandlerFactory);
+    TlsHTTPServer(final TransportChannelListener listener, final AuthHandlerFactory authHandlerFactory) {
+        super(listener, authHandlerFactory);
     }
 
     @Override
index c2c870b96c3fd094ab0446a9e1d6d39cd241879e..48defcff0417e1791995dd2f34f5f46d19e5d84b 100644 (file)
@@ -18,6 +18,7 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
@@ -29,8 +30,11 @@ import static org.opendaylight.netconf.transport.http.TestUtils.freePort;
 import static org.opendaylight.netconf.transport.http.TestUtils.generateX509CertData;
 import static org.opendaylight.netconf.transport.http.TestUtils.invoke;
 
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.HttpMethod;
 import java.net.InetAddress;
 import java.nio.charset.StandardCharsets;
@@ -41,11 +45,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.netconf.transport.api.TransportChannel;
 import org.opendaylight.netconf.transport.api.TransportChannelListener;
 import org.opendaylight.netconf.transport.tcp.BootstrapFactory;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.http.client.rev240208.HttpClientStackGrouping;
@@ -69,7 +75,6 @@ class HttpClientServerTest {
         };
 
     private static ScheduledExecutorService scheduledExecutor;
-    private static RequestDispatcher requestDispatcher;
     private static BootstrapFactory bootstrapFactory;
     private static String localAddress;
 
@@ -87,23 +92,6 @@ class HttpClientServerTest {
         bootstrapFactory = new BootstrapFactory("IntegrationTest", 0);
         localAddress = InetAddress.getLoopbackAddress().getHostAddress();
         scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
-
-        requestDispatcher = (request, callback) -> {
-            // emulate asynchronous server request processing - run in separate thread with 100 millis delay
-            scheduledExecutor.schedule(() -> {
-                // return 200 response with a content built from request parameters
-                final var method = request.method().name();
-                final var uri = request.uri();
-                final var payload = request.content().readableBytes() > 0
-                    ? request.content().toString(StandardCharsets.UTF_8) : "";
-                final var responseMessage = RESPONSE_TEMPLATE.formatted(method, uri, payload);
-                final var response = new DefaultFullHttpResponse(request.protocolVersion(), OK,
-                    wrappedBuffer(responseMessage.getBytes(StandardCharsets.UTF_8)));
-                response.headers().set(CONTENT_TYPE, TEXT_PLAIN)
-                    .setInt(CONTENT_LENGTH, response.content().readableBytes());
-                callback.onSuccess(response);
-            }, 100, TimeUnit.MILLISECONDS);
-        };
     }
 
     @AfterAll
@@ -112,6 +100,34 @@ class HttpClientServerTest {
         scheduledExecutor.shutdown();
     }
 
+    @BeforeEach
+    void beforeEach() {
+        doAnswer(inv -> {
+            inv.<TransportChannel>getArgument(0).channel().pipeline()
+                .addLast(new SimpleChannelInboundHandler<>(FullHttpRequest.class) {
+                    @Override
+                    protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest msg) {
+                        // return 200 response with a content built from request parameters
+                        final var method = msg.method().name();
+                        final var uri = msg.uri();
+                        final var payload = msg.content().readableBytes() > 0
+                            ? msg.content().toString(StandardCharsets.UTF_8) : "";
+                        final var responseMessage = RESPONSE_TEMPLATE.formatted(method, uri, payload);
+                        final var response = new DefaultFullHttpResponse(msg.protocolVersion(), OK,
+                            wrappedBuffer(responseMessage.getBytes(StandardCharsets.UTF_8)));
+                        response.headers()
+                            .set(CONTENT_TYPE, TEXT_PLAIN)
+                            .setInt(CONTENT_LENGTH, response.content().readableBytes());
+                        Http2Utils.copyStreamId(msg, response);
+
+                        // emulate asynchronous server request processing - run in separate thread with 100 millis delay
+                        scheduledExecutor.schedule(() -> ctx.writeAndFlush(response), 100, TimeUnit.MILLISECONDS);
+                    }
+                });
+            return null;
+        }).when(serverTransportListener).onTransportChannelEstablished(any());
+    }
+
     @ParameterizedTest(name = "TCP with no authorization, HTTP/2: {0}")
     @ValueSource(booleans = {false, true})
     void noAuthTcp(final boolean http2) throws Exception {
@@ -183,20 +199,21 @@ class HttpClientServerTest {
 
     private void integrationTest(final boolean http2, final AuthHandlerFactory authHandlerFactory) throws Exception {
         final var server = HTTPServer.listen(serverTransportListener, bootstrapFactory.newServerBootstrap(),
-            serverConfig, requestDispatcher, authHandlerFactory).get(2, TimeUnit.SECONDS);
+            serverConfig, authHandlerFactory).get(2, TimeUnit.SECONDS);
         try {
             final var client = HTTPClient.connect(clientTransportListener, bootstrapFactory.newBootstrap(),
                     clientConfig, http2).get(2, TimeUnit.SECONDS);
             try {
-                verify(serverTransportListener, timeout(2000)).onTransportChannelEstablished(any());
                 verify(clientTransportListener, timeout(2000)).onTransportChannelEstablished(any());
+                verify(serverTransportListener, timeout(2000)).onTransportChannelEstablished(any());
 
                 for (var method : METHODS) {
                     final var uri = nextValue("URI");
                     final var payload = nextValue("PAYLOAD");
                     final var request = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.valueOf(method),
                         uri, wrappedBuffer(payload.getBytes(StandardCharsets.UTF_8)));
-                    request.headers().set(CONTENT_TYPE, TEXT_PLAIN)
+                    request.headers()
+                        .set(CONTENT_TYPE, TEXT_PLAIN)
                         .setInt(CONTENT_LENGTH, request.content().readableBytes())
                         // allow multiple requests on same connections
                         .set(CONNECTION, KEEP_ALIVE);
index 397202dba213c8c872c3d32a25f569140b196a7d..9030d692c44c672724ee5341b7fa85df20fa7a0e 100644 (file)
@@ -32,8 +32,11 @@ import static org.opendaylight.netconf.transport.http.TestUtils.invoke;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
 import java.net.InetAddress;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
@@ -72,7 +75,6 @@ class SseClientServerTest {
     private static final List<String> DATA_VALUES = IntStream.rangeClosed(1, 10)
         .mapToObj(num -> "value " + num).toList();
 
-    private static RequestDispatcher requestDispatcher;
     private static BootstrapFactory bootstrapFactory;
     private static String localAddress;
 
@@ -96,13 +98,6 @@ class SseClientServerTest {
     static void beforeAll() {
         bootstrapFactory = new BootstrapFactory("IntegrationTest", 0);
         localAddress = InetAddress.getLoopbackAddress().getHostAddress();
-        requestDispatcher = (request, callback) -> {
-            final var response = DATA_URI.equals(request.uri())
-                ? new DefaultFullHttpResponse(request.protocolVersion(), OK, OK_CONTENT.copy())
-                : new DefaultFullHttpResponse(request.protocolVersion(), NOT_FOUND, Unpooled.EMPTY_BUFFER);
-            response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
-            callback.onSuccess(response);
-        };
     }
 
     @AfterAll
@@ -115,10 +110,23 @@ class SseClientServerTest {
         clientEventStreamService = null;
         serverEventStreamService = new TestStreamService();
         // init SSE layer on top of HTTP layer using Transport channel listeners
-        serverTransportListener = new TestTransportListener(channel ->
-            SseUtils.enableServerSse(channel, serverEventStreamService, 0, 0));
+        serverTransportListener = new TestTransportListener(channel -> {
+            channel.pipeline().addLast(HTTPServer.REQUEST_DISPATCHER_HANDLER_NAME,
+                new SimpleChannelInboundHandler<>(FullHttpRequest.class) {
+                    @Override
+                    protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest msg) {
+                        final var response = DATA_URI.equals(msg.uri())
+                            ? new DefaultFullHttpResponse(msg.protocolVersion(), OK, OK_CONTENT.copy())
+                            : new DefaultFullHttpResponse(msg.protocolVersion(), NOT_FOUND, Unpooled.EMPTY_BUFFER);
+                        response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
+                        Http2Utils.copyStreamId(msg, response);
+                        ctx.writeAndFlush(response);
+                    }
+                });
+            SseUtils.enableServerSse(channel, serverEventStreamService, 0, 0);
+        });
         clientTransportListener = new TestTransportListener(channel ->
-            SseClientServerTest.this.clientEventStreamService = SseUtils.enableClientSse(channel));
+            clientEventStreamService = SseUtils.enableClientSse(channel));
     }
 
     @ParameterizedTest(name = "TCP with no authorization, HTTP/2: {0}")
@@ -166,7 +174,7 @@ class SseClientServerTest {
 
     private void integrationTest(final boolean http2) throws Exception {
         final var server = HTTPServer.listen(serverTransportListener, bootstrapFactory.newServerBootstrap(),
-            serverConfig, requestDispatcher).get(2, TimeUnit.SECONDS);
+            serverConfig).get(2, TimeUnit.SECONDS);
         try {
             final var client = HTTPClient.connect(clientTransportListener, bootstrapFactory.newBootstrap(),
                 clientConfig, http2).get(2, TimeUnit.SECONDS);