2 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.transport.http;
10 import static java.util.Objects.requireNonNull;
12 import io.netty.buffer.ByteBuf;
13 import io.netty.buffer.ByteBufInputStream;
14 import io.netty.buffer.ByteBufUtil;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.handler.codec.http.DefaultFullHttpResponse;
17 import io.netty.handler.codec.http.FullHttpResponse;
18 import io.netty.handler.codec.http.HttpResponseStatus;
19 import io.netty.handler.codec.http.HttpUtil;
20 import io.netty.handler.codec.http.HttpVersion;
21 import java.io.IOException;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.RejectedExecutionException;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
32 * Asynchronous task executor. Usually tied to {@link HTTPServerSession}.
34 final class ServerRequestExecutor implements PendingRequestListener {
36 * Transport-level details about a {@link PendingRequest} execution.
38 * @param ctx the {@link ChannelHandlerContext} on which the request is occuring
39 * @param streamId the HTTP/2 stream ID, if present
40 * @param version HTTP version of the request
43 private record RequestContext(ChannelHandlerContext ctx, HttpVersion version, @Nullable Integer streamId) {
46 requireNonNull(version);
50 private static final Logger LOG = LoggerFactory.getLogger(ServerRequestExecutor.class);
52 private final ConcurrentHashMap<PendingRequest<?>, RequestContext> pendingRequests = new ConcurrentHashMap<>();
53 private final ExecutorService reqExecutor;
54 private final ExecutorService respExecutor;
56 ServerRequestExecutor(final String threadNamePrefix) {
57 reqExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
58 .name(threadNamePrefix + "-http-server-req-", 0)
59 .inheritInheritableThreadLocals(false)
60 .uncaughtExceptionHandler(
61 (thread, exception) -> LOG.warn("Unhandled request-phase failure", exception))
63 respExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
64 .name(threadNamePrefix + "-http-server-resp-", 0)
65 .inheritInheritableThreadLocals(false)
66 .uncaughtExceptionHandler(
67 (thread, exception) -> LOG.warn("Unhandled response-phase failure", exception))
72 reqExecutor.shutdown();
73 respExecutor.shutdown();
76 void executeRequest(final ChannelHandlerContext ctx, final HttpVersion version, final Integer streamId,
77 final PendingRequest<?> pending, final ByteBuf content) {
78 // We are invoked with content's reference and need to make sure it gets released.
79 final var context = new RequestContext(ctx, version, streamId);
80 if (content.isReadable()) {
81 executeRequest(context, pending, new ByteBufInputStream(content, true));
84 executeRequest(context, pending, (ByteBufInputStream) null);
88 private void executeRequest(final RequestContext context, final PendingRequest<?> pending,
89 final ByteBufInputStream body) {
90 // Remember metadata about the request and then execute it
91 pendingRequests.put(pending, context);
92 reqExecutor.execute(() -> pending.execute(this, body));
96 public void requestComplete(final PendingRequest<?> request, final Response response) {
97 final var req = pendingRequests.remove(request);
99 respond(req.ctx, req.streamId, req.version, response);
101 LOG.warn("Cannot pair request {}, not sending response {}", request, response, new Throwable());
106 public void requestFailed(final PendingRequest<?> request, final Exception cause) {
107 LOG.warn("Internal error while processing {}", request, cause);
108 final var req = pendingRequests.remove(request);
110 HTTPServerSession.respond(req.ctx, req.streamId, formatException(cause, req.version()));
112 LOG.warn("Cannot pair request, not sending response", new Throwable());
117 void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId, final HttpVersion version,
118 final Response response) {
120 case ReadyResponse resp -> HTTPServerSession.respond(ctx, streamId, resp.toHttpResponse(version));
121 case FiniteResponse resp -> respond(ctx, streamId, version, resp);
126 private void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId, final HttpVersion version,
127 final FiniteResponse response) {
129 respExecutor.execute(
130 () -> HTTPServerSession.respond(ctx, streamId, formatResponse(response, ctx, version)));
131 } catch (RejectedExecutionException e) {
132 LOG.trace("Session shut down, dropping response {}", response, e);
136 // Hand-coded, as simple as possible
138 private static FullHttpResponse formatException(final Exception cause, final HttpVersion version) {
139 final var message = new DefaultFullHttpResponse(version, HttpResponseStatus.INTERNAL_SERVER_ERROR);
140 final var content = message.content();
141 // Note: we are tempted to do a cause.toString() here, but we are dealing with unhandled badness here,
142 // so we do not want to be too revealing -- hence a message is all the user gets.
143 ByteBufUtil.writeUtf8(content, cause.getMessage());
144 HttpUtil.setContentLength(message, content.readableBytes());
148 // Executed on respExecutor, so it is okay to block
150 private static FullHttpResponse formatResponse(final FiniteResponse response, final ChannelHandlerContext ctx,
151 final HttpVersion version) {
152 // FIXME: We are filling a full ByteBuf and producing a complete FullHttpResponse, which is not want we want.
154 // We want to be emitting a series of write() requests into the queue, each of which is subject
155 // to channel's flow control -- i.e. it effectively is a MPSC outbound queue.
157 // We are using OutputStream below as the synchronous interface, but we really want our own, such that we
158 // get the headers first and we invoke body streaming (if applicable and indicated by return).
160 // In the streaming phase, we start with a HttpObjectSender initialized to the HttpResponse, as indicated
161 // in previous step. It also implements OutputStream, which is the fasade we show to the user. As they
162 // pump body data, we check for body size and events and:
163 // - buffer initial stuff, so that we produce a FullHttpResponse if the payload is below
164 // 256KiB (or so), i.e. producing Content-Length header and dumping the thing in one go
165 // - otherwise emit just HttpResponse with Transfer-Enconding: chunked and continue sending
166 // out chunks (of reasonable size).
167 // - finish up with a LastHttpContent when OutputStream.close() is called ... which might be problematic
168 // w.r.t. failure cases, so it needs some figuring out
169 final ReadyResponse ready;
171 ready = response.toReadyResponse(ctx.alloc());
172 } catch (IOException e) {
173 LOG.warn("IO error while converting formatting response", e);
174 return formatException(e, version);
176 return ready.toHttpResponse(version);