<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
public NettyEndpoint(final RestconfServer server, final PrincipalService principalService,
final RestconfStream.Registry streamRegistry, final NettyEndpointConfiguration configuration) {
- final var listener = new RestconfTransportChannelListener(streamRegistry, configuration);
- final var dispatcher = new RestconfRequestDispatcher(server, principalService, configuration.baseUri(),
- configuration.errorTagMapping(), configuration.defaultAcceptType(), configuration.prettyPrint());
+ final var listener = new RestconfTransportChannelListener(server, streamRegistry, principalService,
+ configuration);
final var bootstrapFactory = new BootstrapFactory(configuration.groupName(), configuration.groupThreads());
try {
httpServer = HTTPServer.listen(listener, bootstrapFactory.newServerBootstrap(),
- configuration.transportConfiguration(), dispatcher, principalService).get();
+ configuration.transportConfiguration(), principalService).get();
} catch (UnsupportedConfigurationException | ExecutionException | InterruptedException e) {
throw new IllegalStateException("Could not start RESTCONF server", e);
}
@Override
public @Nullable TransportSession session() {
+ // FIXME: return the correct NettyTransportSession
return null;
}
}
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.AsciiString;
import java.net.URI;
-import org.opendaylight.netconf.transport.http.RequestDispatcher;
import org.opendaylight.restconf.api.query.PrettyPrintParam;
import org.opendaylight.restconf.server.api.RestconfServer;
import org.opendaylight.restconf.server.spi.ErrorTagMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class RestconfRequestDispatcher implements RequestDispatcher {
+final class RestconfRequestDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(RestconfRequestDispatcher.class);
- private final URI baseURI;
+ private final URI baseUri;
private final RestconfServer restconfService;
private final PrincipalService principalService;
private final ErrorTagMapping errorTagMapping;
private final AsciiString defaultAcceptType;
private final PrettyPrintParam defaultPrettyPrint;
- public RestconfRequestDispatcher(final RestconfServer restconfService, final PrincipalService principalService,
+ RestconfRequestDispatcher(final RestconfServer restconfService, final PrincipalService principalService,
final URI baseUri, final ErrorTagMapping errorTagMapping,
final AsciiString defaultAcceptType, final PrettyPrintParam defaultPrettyPrint) {
this.restconfService = requireNonNull(restconfService);
this.principalService = requireNonNull(principalService);
- this.baseURI = requireNonNull(baseUri);
+ this.baseUri = requireNonNull(baseUri);
this.errorTagMapping = requireNonNull(errorTagMapping);
this.defaultAcceptType = requireNonNull(defaultAcceptType);
this.defaultPrettyPrint = requireNonNull(defaultPrettyPrint);
LOG.info("{} initialized with service {}", getClass().getSimpleName(), restconfService.getClass());
LOG.info("Base path: {}, default accept: {}, default pretty print: {}",
- this.baseURI, defaultAcceptType, defaultPrettyPrint.value());
+ baseUri, defaultAcceptType, defaultPrettyPrint.value());
}
- @Override
@SuppressWarnings("IllegalCatch")
- public void dispatch(final FullHttpRequest request, final FutureCallback<FullHttpResponse> callback) {
+ void dispatch(final FullHttpRequest request, final FutureCallback<FullHttpResponse> callback) {
LOG.debug("Dispatching {} {}", request.method(), request.uri());
final var principal = principalService.acquirePrincipal(request);
- final var params = new RequestParameters(baseURI, request, principal,
+ final var params = new RequestParameters(baseUri, request, principal,
errorTagMapping, defaultAcceptType, defaultPrettyPrint);
try {
switch (params.pathParameters().apiResource()) {
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.server;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.FutureCallback;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http2.HttpConversionUtil.ExtensionHeaderNames;
+import io.netty.util.AsciiString;
+import org.opendaylight.restconf.server.api.TransportSession;
+
+/**
+ * A RESTCONF session, as defined in <a href="https://www.rfc-editor.org/rfc/rfc8650#section-3.1">RFC8650</a>. It acts
+ * as glue between a Netty channel and a RESTCONF server and may be servicing one (HTTP/1.1) or more (HTTP/2) logical
+ * connections.
+ */
+final class RestconfSession extends SimpleChannelInboundHandler<FullHttpRequest> implements TransportSession {
+ private static final AsciiString STREAM_ID = ExtensionHeaderNames.STREAM_ID.text();
+
+ private final RestconfRequestDispatcher dispatcher;
+
+ RestconfSession(final RestconfRequestDispatcher dispatcher) {
+ this.dispatcher = requireNonNull(dispatcher);
+ }
+
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest msg) {
+ dispatcher.dispatch(msg.retain(), new FutureCallback<>() {
+ @Override
+ public void onSuccess(final FullHttpResponse response) {
+ final var streamId = msg.headers().getInt(STREAM_ID);
+ if (streamId != null) {
+ response.headers().setInt(STREAM_ID, streamId);
+ }
+ msg.release();
+ ctx.writeAndFlush(response);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ final var message = throwable.getMessage();
+ final var content = message == null ? Unpooled.EMPTY_BUFFER
+ : ByteBufUtil.writeUtf8(ctx.alloc(), message);
+ final var response = new DefaultFullHttpResponse(msg.protocolVersion(),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR, content);
+ response.headers()
+ .set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN)
+ .setInt(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
+ onSuccess(response);
+ }
+ });
+ }
+}
import org.opendaylight.netconf.transport.api.TransportChannel;
import org.opendaylight.netconf.transport.api.TransportChannelListener;
+import org.opendaylight.netconf.transport.http.HTTPServer;
import org.opendaylight.netconf.transport.http.SseUtils;
import org.opendaylight.restconf.server.api.RestconfServer;
import org.opendaylight.restconf.server.spi.RestconfStream;
private final RestconfStream.Registry streamRegistry;
private final NettyEndpointConfiguration configuration;
+ private final RestconfRequestDispatcher dispatcher;
- RestconfTransportChannelListener(final RestconfStream.Registry streamRegistry,
- final NettyEndpointConfiguration configuration) {
+ RestconfTransportChannelListener(final RestconfServer server, final RestconfStream.Registry streamRegistry,
+ final PrincipalService principalService, final NettyEndpointConfiguration configuration) {
this.streamRegistry = requireNonNull(streamRegistry);
this.configuration = requireNonNull(configuration);
+ dispatcher = new RestconfRequestDispatcher(server, principalService, configuration.baseUri(),
+ configuration.errorTagMapping(), configuration.defaultAcceptType(), configuration.prettyPrint());
}
@Override
public void onTransportChannelEstablished(final TransportChannel channel) {
- SseUtils.enableServerSse(channel.channel(),
+ final var nettyChannel = channel.channel();
+
+ nettyChannel.pipeline().addLast(HTTPServer.REQUEST_DISPATCHER_HANDLER_NAME, new RestconfSession(dispatcher));
+
+ SseUtils.enableServerSse(nettyChannel,
new RestconfStreamService(streamRegistry, configuration.baseUri(), configuration.errorTagMapping(),
configuration.defaultAcceptType(), configuration.prettyPrint()),
configuration.sseMaximumFragmentLength().toJava(), configuration.sseHeartbeatIntervalMillis().toJava());
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.opendaylight.netconf.transport.http.RequestDispatcher;
import org.opendaylight.restconf.api.ApiPath;
import org.opendaylight.restconf.api.query.PrettyPrintParam;
import org.opendaylight.restconf.server.TestUtils.TestEncoding;
@Captor
private ArgumentCaptor<FullHttpResponse> responseCaptor;
- private RequestDispatcher dispatcher;
+ private RestconfRequestDispatcher dispatcher;
@BeforeEach
void beforeEach() {
*/
package org.opendaylight.netconf.transport.http;
-import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
-import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
-import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
-import static io.netty.handler.codec.http.HttpHeaderValues.TEXT_PLAIN;
-import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static java.util.Objects.requireNonNull;
-import static org.opendaylight.netconf.transport.http.Http2Utils.copyStreamId;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
-import java.nio.charset.StandardCharsets;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.netconf.transport.api.TransportChannel;
* A {@link HTTPTransportStack} acting as a server.
*/
public abstract sealed class HTTPServer extends HTTPTransportStack permits PlainHTTPServer, TlsHTTPServer {
- static final String REQUEST_DISPATCHER_HANDLER_NAME = "request-dispatcher";
+ public static final String REQUEST_DISPATCHER_HANDLER_NAME = "request-dispatcher";
private final AuthHandlerFactory authHandlerFactory;
- private final @NonNull RequestDispatcher dispatcher;
- HTTPServer(final TransportChannelListener listener, final RequestDispatcher dispatcher,
- final AuthHandlerFactory authHandlerFactory) {
+ HTTPServer(final TransportChannelListener listener, final AuthHandlerFactory authHandlerFactory) {
super(listener);
- this.dispatcher = requireNonNull(dispatcher);
this.authHandlerFactory = authHandlerFactory;
}
* @param listener {@link TransportChannelListener} to notify when the session is established
* @param bootstrap {@link ServerBootstrap} to use for the underlying Netty server channel
* @param listenParams Listening parameters
- * @param dispatcher server logic layer implementation as {@link RequestDispatcher}
* @return A future
* @throws UnsupportedConfigurationException when {@code listenParams} contains an unsupported options
* @throws NullPointerException if any argument is {@code null}
*/
public static final @NonNull ListenableFuture<HTTPServer> listen(final TransportChannelListener listener,
- final ServerBootstrap bootstrap, final HttpServerStackGrouping listenParams,
- final RequestDispatcher dispatcher) throws UnsupportedConfigurationException {
- return listen(listener, bootstrap, listenParams, dispatcher, null);
+ final ServerBootstrap bootstrap, final HttpServerStackGrouping listenParams)
+ throws UnsupportedConfigurationException {
+ return listen(listener, bootstrap, listenParams, null);
}
/**
* @param listener {@link TransportChannelListener} to notify when the session is established
* @param bootstrap {@link ServerBootstrap} to use for the underlying Netty server channel
* @param listenParams Listening parameters
- * @param dispatcher server logic layer implementation as {@link RequestDispatcher}
* @param authHandlerFactory {@link AuthHandlerFactory} instance, provides channel handler serving the request
* authentication; optional, if defined the Basic Auth settings of listenParams will be ignored
* @return A future
*/
public static final @NonNull ListenableFuture<HTTPServer> listen(final TransportChannelListener listener,
final ServerBootstrap bootstrap, final HttpServerStackGrouping listenParams,
- final RequestDispatcher dispatcher, final @Nullable AuthHandlerFactory authHandlerFactory)
- throws UnsupportedConfigurationException {
+ final @Nullable AuthHandlerFactory authHandlerFactory) throws UnsupportedConfigurationException {
final var transport = requireNonNull(listenParams).getTransport();
return switch (transport) {
- case Tcp tcpCase -> listen(listener, bootstrap, tcpCase, dispatcher, authHandlerFactory);
- case Tls tlsCase -> listen(listener, bootstrap, tlsCase, dispatcher, authHandlerFactory);
+ case Tcp tcpCase -> listen(listener, bootstrap, tcpCase, authHandlerFactory);
+ case Tls tlsCase -> listen(listener, bootstrap, tlsCase, authHandlerFactory);
default -> throw new UnsupportedConfigurationException("Unsupported transport: " + transport);
};
}
private static @NonNull ListenableFuture<HTTPServer> listen(final TransportChannelListener listener,
- final ServerBootstrap bootstrap, final Tcp tcpCase, final RequestDispatcher dispatcher,
- final @Nullable AuthHandlerFactory authHandlerFactory) throws UnsupportedConfigurationException {
+ final ServerBootstrap bootstrap, final Tcp tcpCase, final @Nullable AuthHandlerFactory authHandlerFactory)
+ throws UnsupportedConfigurationException {
final var tcp = tcpCase.getTcp();
- final var server = new PlainHTTPServer(listener, dispatcher, authHandlerFactory != null ? authHandlerFactory
+ final var server = new PlainHTTPServer(listener, authHandlerFactory != null ? authHandlerFactory
: BasicAuthHandlerFactory.ofNullable(tcp.getHttpServerParameters()));
return transformUnderlay(server,
TCPServer.listen(server.asListener(), bootstrap, tcp.nonnullTcpServerParameters()));
}
private static @NonNull ListenableFuture<HTTPServer> listen(final TransportChannelListener listener,
- final ServerBootstrap bootstrap, final Tls tlsCase, final RequestDispatcher dispatcher,
- final @Nullable AuthHandlerFactory authHandlerFactory) throws UnsupportedConfigurationException {
+ final ServerBootstrap bootstrap, final Tls tlsCase, final @Nullable AuthHandlerFactory authHandlerFactory)
+ throws UnsupportedConfigurationException {
final var tls = tlsCase.getTls();
- final var server = new TlsHTTPServer(listener, dispatcher, authHandlerFactory != null ? authHandlerFactory
+ final var server = new TlsHTTPServer(listener, authHandlerFactory != null ? authHandlerFactory
: BasicAuthHandlerFactory.ofNullable(tls.getHttpServerParameters()));
return transformUnderlay(server,
TLSServer.listen(server.asListener(), bootstrap, tls.nonnullTcpServerParameters(),
pipeline.addLast(authHandlerFactory.create());
}
- pipeline.addLast(REQUEST_DISPATCHER_HANDLER_NAME, new SimpleChannelInboundHandler<FullHttpRequest>() {
- @Override
- protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest request) {
- dispatcher.dispatch(request.retain(), new FutureCallback<>() {
- @Override
- public void onSuccess(final FullHttpResponse response) {
- copyStreamId(request, response);
- request.release();
- ctx.writeAndFlush(response);
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- final var message = throwable.getMessage();
- final var content = message == null ? EMPTY_BUFFER
- : Unpooled.wrappedBuffer(message.getBytes(StandardCharsets.UTF_8));
- final var response = new DefaultFullHttpResponse(request.protocolVersion(),
- INTERNAL_SERVER_ERROR, content);
- response.headers()
- .set(CONTENT_TYPE, TEXT_PLAIN)
- .setInt(CONTENT_LENGTH, response.content().readableBytes());
- onSuccess(response);
- }
- });
- }
- });
-
addTransportChannel(new HTTPTransportChannel(underlayChannel));
}
* An {@link HTTPServer} operating over plain TCP.
*/
final class PlainHTTPServer extends HTTPServer {
- PlainHTTPServer(final TransportChannelListener listener, final RequestDispatcher dispatcher,
- final AuthHandlerFactory authHandlerFactory) {
- super(listener, dispatcher, authHandlerFactory);
+ PlainHTTPServer(final TransportChannelListener listener, final AuthHandlerFactory authHandlerFactory) {
+ super(listener, authHandlerFactory);
}
@Override
/**
* Functional interface for HTTP request dispatcher.
+ *
+ * @deprecated This interface is an Operations layer concern (i.e. request/response semantics). As such it is a clear
+ * layering violation as {@code org.opendaylight.netconf.transport} deals only with the Secure Transport
+ * layer.
*/
@NonNullByDefault
@FunctionalInterface
+@Deprecated(forRemoval = true, since = "8.0.2")
public interface RequestDispatcher {
/**
* Performs {@link FullHttpRequest} processing. Any error occurred is expected either to be returned within
* An {@link HTTPServer} operating over TLS.
*/
final class TlsHTTPServer extends HTTPServer {
- TlsHTTPServer(final TransportChannelListener listener, final RequestDispatcher dispatcher,
- final AuthHandlerFactory authHandlerFactory) {
- super(listener, dispatcher, authHandlerFactory);
+ TlsHTTPServer(final TransportChannelListener listener, final AuthHandlerFactory authHandlerFactory) {
+ super(listener, authHandlerFactory);
}
@Override
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.opendaylight.netconf.transport.http.TestUtils.generateX509CertData;
import static org.opendaylight.netconf.transport.http.TestUtils.invoke;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.opendaylight.netconf.transport.api.TransportChannel;
import org.opendaylight.netconf.transport.api.TransportChannelListener;
import org.opendaylight.netconf.transport.tcp.BootstrapFactory;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.http.client.rev240208.HttpClientStackGrouping;
};
private static ScheduledExecutorService scheduledExecutor;
- private static RequestDispatcher requestDispatcher;
private static BootstrapFactory bootstrapFactory;
private static String localAddress;
bootstrapFactory = new BootstrapFactory("IntegrationTest", 0);
localAddress = InetAddress.getLoopbackAddress().getHostAddress();
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
-
- requestDispatcher = (request, callback) -> {
- // emulate asynchronous server request processing - run in separate thread with 100 millis delay
- scheduledExecutor.schedule(() -> {
- // return 200 response with a content built from request parameters
- final var method = request.method().name();
- final var uri = request.uri();
- final var payload = request.content().readableBytes() > 0
- ? request.content().toString(StandardCharsets.UTF_8) : "";
- final var responseMessage = RESPONSE_TEMPLATE.formatted(method, uri, payload);
- final var response = new DefaultFullHttpResponse(request.protocolVersion(), OK,
- wrappedBuffer(responseMessage.getBytes(StandardCharsets.UTF_8)));
- response.headers().set(CONTENT_TYPE, TEXT_PLAIN)
- .setInt(CONTENT_LENGTH, response.content().readableBytes());
- callback.onSuccess(response);
- }, 100, TimeUnit.MILLISECONDS);
- };
}
@AfterAll
scheduledExecutor.shutdown();
}
+ @BeforeEach
+ void beforeEach() {
+ doAnswer(inv -> {
+ inv.<TransportChannel>getArgument(0).channel().pipeline()
+ .addLast(new SimpleChannelInboundHandler<>(FullHttpRequest.class) {
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest msg) {
+ // return 200 response with a content built from request parameters
+ final var method = msg.method().name();
+ final var uri = msg.uri();
+ final var payload = msg.content().readableBytes() > 0
+ ? msg.content().toString(StandardCharsets.UTF_8) : "";
+ final var responseMessage = RESPONSE_TEMPLATE.formatted(method, uri, payload);
+ final var response = new DefaultFullHttpResponse(msg.protocolVersion(), OK,
+ wrappedBuffer(responseMessage.getBytes(StandardCharsets.UTF_8)));
+ response.headers()
+ .set(CONTENT_TYPE, TEXT_PLAIN)
+ .setInt(CONTENT_LENGTH, response.content().readableBytes());
+ Http2Utils.copyStreamId(msg, response);
+
+ // emulate asynchronous server request processing - run in separate thread with 100 millis delay
+ scheduledExecutor.schedule(() -> ctx.writeAndFlush(response), 100, TimeUnit.MILLISECONDS);
+ }
+ });
+ return null;
+ }).when(serverTransportListener).onTransportChannelEstablished(any());
+ }
+
@ParameterizedTest(name = "TCP with no authorization, HTTP/2: {0}")
@ValueSource(booleans = {false, true})
void noAuthTcp(final boolean http2) throws Exception {
private void integrationTest(final boolean http2, final AuthHandlerFactory authHandlerFactory) throws Exception {
final var server = HTTPServer.listen(serverTransportListener, bootstrapFactory.newServerBootstrap(),
- serverConfig, requestDispatcher, authHandlerFactory).get(2, TimeUnit.SECONDS);
+ serverConfig, authHandlerFactory).get(2, TimeUnit.SECONDS);
try {
final var client = HTTPClient.connect(clientTransportListener, bootstrapFactory.newBootstrap(),
clientConfig, http2).get(2, TimeUnit.SECONDS);
try {
- verify(serverTransportListener, timeout(2000)).onTransportChannelEstablished(any());
verify(clientTransportListener, timeout(2000)).onTransportChannelEstablished(any());
+ verify(serverTransportListener, timeout(2000)).onTransportChannelEstablished(any());
for (var method : METHODS) {
final var uri = nextValue("URI");
final var payload = nextValue("PAYLOAD");
final var request = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.valueOf(method),
uri, wrappedBuffer(payload.getBytes(StandardCharsets.UTF_8)));
- request.headers().set(CONTENT_TYPE, TEXT_PLAIN)
+ request.headers()
+ .set(CONTENT_TYPE, TEXT_PLAIN)
.setInt(CONTENT_LENGTH, request.content().readableBytes())
// allow multiple requests on same connections
.set(CONNECTION, KEEP_ALIVE);
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
import java.net.InetAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
private static final List<String> DATA_VALUES = IntStream.rangeClosed(1, 10)
.mapToObj(num -> "value " + num).toList();
- private static RequestDispatcher requestDispatcher;
private static BootstrapFactory bootstrapFactory;
private static String localAddress;
static void beforeAll() {
bootstrapFactory = new BootstrapFactory("IntegrationTest", 0);
localAddress = InetAddress.getLoopbackAddress().getHostAddress();
- requestDispatcher = (request, callback) -> {
- final var response = DATA_URI.equals(request.uri())
- ? new DefaultFullHttpResponse(request.protocolVersion(), OK, OK_CONTENT.copy())
- : new DefaultFullHttpResponse(request.protocolVersion(), NOT_FOUND, Unpooled.EMPTY_BUFFER);
- response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
- callback.onSuccess(response);
- };
}
@AfterAll
clientEventStreamService = null;
serverEventStreamService = new TestStreamService();
// init SSE layer on top of HTTP layer using Transport channel listeners
- serverTransportListener = new TestTransportListener(channel ->
- SseUtils.enableServerSse(channel, serverEventStreamService, 0, 0));
+ serverTransportListener = new TestTransportListener(channel -> {
+ channel.pipeline().addLast(HTTPServer.REQUEST_DISPATCHER_HANDLER_NAME,
+ new SimpleChannelInboundHandler<>(FullHttpRequest.class) {
+ @Override
+ protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest msg) {
+ final var response = DATA_URI.equals(msg.uri())
+ ? new DefaultFullHttpResponse(msg.protocolVersion(), OK, OK_CONTENT.copy())
+ : new DefaultFullHttpResponse(msg.protocolVersion(), NOT_FOUND, Unpooled.EMPTY_BUFFER);
+ response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
+ Http2Utils.copyStreamId(msg, response);
+ ctx.writeAndFlush(response);
+ }
+ });
+ SseUtils.enableServerSse(channel, serverEventStreamService, 0, 0);
+ });
clientTransportListener = new TestTransportListener(channel ->
- SseClientServerTest.this.clientEventStreamService = SseUtils.enableClientSse(channel));
+ clientEventStreamService = SseUtils.enableClientSse(channel));
}
@ParameterizedTest(name = "TCP with no authorization, HTTP/2: {0}")
private void integrationTest(final boolean http2) throws Exception {
final var server = HTTPServer.listen(serverTransportListener, bootstrapFactory.newServerBootstrap(),
- serverConfig, requestDispatcher).get(2, TimeUnit.SECONDS);
+ serverConfig).get(2, TimeUnit.SECONDS);
try {
final var client = HTTPClient.connect(clientTransportListener, bootstrapFactory.newBootstrap(),
clientConfig, http2).get(2, TimeUnit.SECONDS);