f7b2e82b5c69fa8d15c0799a19cba3b8e7cc29e4
[netconf.git] /
1 /*
2  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.transport.http;
9
10 import static java.util.Objects.requireNonNull;
11
12 import io.netty.buffer.ByteBuf;
13 import io.netty.buffer.ByteBufInputStream;
14 import io.netty.buffer.ByteBufUtil;
15 import io.netty.channel.ChannelHandlerContext;
16 import io.netty.handler.codec.http.DefaultFullHttpResponse;
17 import io.netty.handler.codec.http.FullHttpResponse;
18 import io.netty.handler.codec.http.HttpResponseStatus;
19 import io.netty.handler.codec.http.HttpUtil;
20 import io.netty.handler.codec.http.HttpVersion;
21 import java.io.IOException;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.RejectedExecutionException;
26 import org.eclipse.jdt.annotation.NonNullByDefault;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Asynchronous task executor. Usually tied to {@link HTTPServerSession}.
33  */
34 final class ServerRequestExecutor implements PendingRequestListener {
35     /**
36      * Transport-level details about a {@link PendingRequest} execution.
37      *
38      * @param ctx the {@link ChannelHandlerContext} on which the request is occuring
39      * @param streamId the HTTP/2 stream ID, if present
40      * @param version HTTP version of the request
41      */
42     @NonNullByDefault
43     private record RequestContext(ChannelHandlerContext ctx, HttpVersion version, @Nullable Integer streamId) {
44         RequestContext {
45             requireNonNull(ctx);
46             requireNonNull(version);
47         }
48     }
49
50     private static final Logger LOG = LoggerFactory.getLogger(ServerRequestExecutor.class);
51
52     private final ConcurrentHashMap<PendingRequest<?>, RequestContext> pendingRequests = new ConcurrentHashMap<>();
53     private final ExecutorService reqExecutor;
54     private final ExecutorService respExecutor;
55
56     ServerRequestExecutor(final String threadNamePrefix) {
57         reqExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
58             .name(threadNamePrefix + "-http-server-req-", 0)
59             .inheritInheritableThreadLocals(false)
60             .uncaughtExceptionHandler(
61                 (thread, exception) -> LOG.warn("Unhandled request-phase failure", exception))
62             .factory());
63         respExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
64             .name(threadNamePrefix + "-http-server-resp-", 0)
65             .inheritInheritableThreadLocals(false)
66             .uncaughtExceptionHandler(
67                 (thread, exception) -> LOG.warn("Unhandled response-phase failure", exception))
68             .factory());
69     }
70
71     void shutdown() {
72         reqExecutor.shutdown();
73         respExecutor.shutdown();
74     }
75
76     void executeRequest(final ChannelHandlerContext ctx, final HttpVersion version, final Integer streamId,
77             final PendingRequest<?> pending, final ByteBuf content) {
78         // We are invoked with content's reference and need to make sure it gets released.
79         final var context = new RequestContext(ctx, version, streamId);
80         if (content.isReadable()) {
81             executeRequest(context, pending, new ByteBufInputStream(content, true));
82         } else {
83             content.release();
84             executeRequest(context, pending, (ByteBufInputStream) null);
85         }
86     }
87
88     private void executeRequest(final RequestContext context, final PendingRequest<?> pending,
89             final ByteBufInputStream body) {
90         // Remember metadata about the request and then execute it
91         pendingRequests.put(pending, context);
92         reqExecutor.execute(() -> pending.execute(this, body));
93     }
94
95     @Override
96     public void requestComplete(final PendingRequest<?> request, final Response response) {
97         final var req = pendingRequests.remove(request);
98         if (req != null) {
99             respond(req.ctx, req.streamId, req.version, response);
100         } else {
101             LOG.warn("Cannot pair request {}, not sending response {}", request, response, new Throwable());
102         }
103     }
104
105     @Override
106     public void requestFailed(final PendingRequest<?> request, final Exception cause) {
107         LOG.warn("Internal error while processing {}", request, cause);
108         final var req = pendingRequests.remove(request);
109         if (req != null) {
110             HTTPServerSession.respond(req.ctx, req.streamId, formatException(cause, req.version()));
111         } else {
112             LOG.warn("Cannot pair request, not sending response", new Throwable());
113         }
114     }
115
116     @NonNullByDefault
117     void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId, final HttpVersion version,
118             final Response response) {
119         switch (response) {
120             case ReadyResponse resp -> HTTPServerSession.respond(ctx, streamId, resp.toHttpResponse(version));
121             case FiniteResponse resp -> respond(ctx, streamId, version, resp);
122         }
123     }
124
125     @NonNullByDefault
126     private void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId, final HttpVersion version,
127             final FiniteResponse response) {
128         try {
129             respExecutor.execute(
130                 () -> HTTPServerSession.respond(ctx, streamId, formatResponse(response, ctx, version)));
131         } catch (RejectedExecutionException e) {
132             LOG.trace("Session shut down, dropping response {}", response, e);
133         }
134     }
135
136     // Hand-coded, as simple as possible
137     @NonNullByDefault
138     private static FullHttpResponse formatException(final Exception cause, final HttpVersion version) {
139         final var message = new DefaultFullHttpResponse(version, HttpResponseStatus.INTERNAL_SERVER_ERROR);
140         final var content = message.content();
141         // Note: we are tempted to do a cause.toString() here, but we are dealing with unhandled badness here,
142         //       so we do not want to be too revealing -- hence a message is all the user gets.
143         ByteBufUtil.writeUtf8(content, cause.getMessage());
144         HttpUtil.setContentLength(message, content.readableBytes());
145         return message;
146     }
147
148     // Executed on respExecutor, so it is okay to block
149     @NonNullByDefault
150     private static FullHttpResponse formatResponse(final FiniteResponse response, final ChannelHandlerContext ctx,
151             final HttpVersion version) {
152         // FIXME: We are filling a full ByteBuf and producing a complete FullHttpResponse, which is not want we want.
153         //
154         //        We want to be emitting a series of write() requests into the queue, each of which is subject
155         //        to channel's flow control -- i.e. it effectively is a MPSC outbound queue.
156         //
157         //        We are using OutputStream below as the synchronous interface, but we really want our own, such that we
158         //        get the headers first and we invoke body streaming (if applicable and indicated by return).
159         //
160         //        In the streaming phase, we start with a HttpObjectSender initialized to the HttpResponse, as indicated
161         //        in previous step. It also implements OutputStream, which is the fasade we show to the user. As they
162         //        pump body data, we check for body size and events and:
163         //        - buffer initial stuff, so that we produce a FullHttpResponse if the payload is below
164         //          256KiB (or so), i.e. producing Content-Length header and dumping the thing in one go
165         //        - otherwise emit just HttpResponse with Transfer-Enconding: chunked and continue sending
166         //          out chunks (of reasonable size).
167         //        - finish up with a LastHttpContent when OutputStream.close() is called ... which might be problematic
168         //          w.r.t. failure cases, so it needs some figuring out
169         final ReadyResponse ready;
170         try {
171             ready = response.toReadyResponse(ctx.alloc());
172         } catch (IOException e) {
173             LOG.warn("IO error while converting formatting response", e);
174             return formatException(e, version);
175         }
176         return ready.toHttpResponse(version);
177     }
178 }