Use channel allocator for Responses 66/114266/8
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 30 Oct 2024 12:03:31 +0000 (13:03 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 7 Nov 2024 09:05:30 +0000 (10:05 +0100)
Refactor HTTPServerSession to correctly take advantage of ReadyResponse
and of channel's allocator -- leading to potential buffer reuse.

This also ensures we release the allocated buffer is released in case
of a writeout failure.

Change-Id: Ifb603aefee753fd9c0bc3c38f65e0eaa3cff2a6f
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
protocol/restconf-server/src/test/java/org/opendaylight/restconf/server/AbstractRequestProcessorTest.java
protocol/restconf-server/src/test/java/org/opendaylight/restconf/server/DataRequestProcessorTest.java
protocol/restconf-server/src/test/java/org/opendaylight/restconf/server/ErrorHandlerTest.java
protocol/restconf-server/src/test/java/org/opendaylight/restconf/server/ModulesRequestProcessorTest.java
protocol/restconf-server/src/test/java/org/opendaylight/restconf/server/OperationsRequestProcessorTest.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ByteStreamRequestResponse.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/HTTPServerSession.java

index 14587ae24ebf01c35d927b849edb7e17af05b04f..6dd15b8a8107e81ce06636de041065b6e90215d4 100644 (file)
@@ -12,6 +12,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.restconf.server.TestUtils.ERROR_TAG_MAPPING;
 
+import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
@@ -92,6 +93,11 @@ class AbstractRequestProcessorTest {
         return responseCaptor.getValue();
     }
 
+    protected final FullHttpResponse dispatchWithAlloc(final FullHttpRequest request) {
+        doReturn(UnpooledByteBufAllocator.DEFAULT).when(ctx).alloc();
+        return dispatch(request);
+    }
+
     protected static final List<Arguments> encodings() {
         return List.of(
             Arguments.of(TestEncoding.JSON, JSON_CONTENT),
index def93c83154b7a24af0019747e9ffe61730cc979..9c8a9bf06f80d56839f4f6576f1ea1b13515831e 100644 (file)
@@ -169,7 +169,7 @@ class DataRequestProcessorTest extends AbstractRequestProcessorTest {
         doAnswer(answerCompleteWith(result)).when(server).dataGET(any());
 
         final var request = buildRequest(HttpMethod.GET, DATA_PATH, encoding, null);
-        final var response = dispatch(request);
+        final var response = dispatchWithAlloc(request);
 
         assertResponse(response, HttpResponseStatus.OK, encoding.responseType, content);
         assertResponseHeaders(response, Map.of(HttpHeaderNames.CACHE_CONTROL, HttpHeaderValues.NO_CACHE));
@@ -183,7 +183,7 @@ class DataRequestProcessorTest extends AbstractRequestProcessorTest {
         doAnswer(answerCompleteWith(result)).when(server).dataGET(any(), any(ApiPath.class));
 
         final var request = buildRequest(HttpMethod.GET, DATA_PATH_WITH_ID, encoding, null);
-        final var response = dispatch(request);
+        final var response = dispatchWithAlloc(request);
         verify(server).dataGET(any(), apiPathCaptor.capture());
 
         assertEquals(API_PATH, apiPathCaptor.getValue());
@@ -237,7 +237,7 @@ class DataRequestProcessorTest extends AbstractRequestProcessorTest {
         doAnswer(answer).when(server).dataPOST(any(), any(ApiPath.class), any(DataPostBody.class));
 
         final var request = buildRequest(HttpMethod.POST, DATA_PATH_WITH_ID, encoding, content);
-        final var response = dispatch(request);
+        final var response = dispatchWithAlloc(request);
         verify(server).dataPOST(any(), apiPathCaptor.capture(), any());
 
         assertEquals(API_PATH, apiPathCaptor.getValue());
@@ -323,7 +323,7 @@ class DataRequestProcessorTest extends AbstractRequestProcessorTest {
         doAnswer(answer).when(server).dataPATCH(any(), any(PatchBody.class));
 
         final var request = buildRequest(HttpMethod.PATCH, DATA_PATH, encoding, input);
-        final var response = dispatch(request);
+        final var response = dispatchWithAlloc(request);
         answer.assertContent(input);
 
         final var expectedStatus = expectedErrorTag == null ? HttpResponseStatus.OK
index ad1b097474e5c1d03060f688eb8a230e57542d75..ba4b0117653f0587d1d7e0e3b2cc941833a85fc7 100644 (file)
@@ -150,7 +150,7 @@ class ErrorHandlerTest extends AbstractRequestProcessorTest {
         }
 
         final var request = buildRequest(HttpMethod.GET, DATA_PATH, encoding, null);
-        final var response = dispatch(request);
+        final var response = dispatchWithAlloc(request);
         assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.status());
         assertEquals("encoding-error", response.content().toString(StandardCharsets.UTF_8));
     }
index ac01074198871cbfb82b112dca8565b7ae4c408e..7f573fd5477a43b9e9979c77066637b4888db91d 100644 (file)
@@ -70,7 +70,7 @@ class ModulesRequestProcessorTest extends AbstractRequestProcessorTest {
         doAnswer(answerCompleteWith(result)).when(server).yangLibraryVersionGET(any());
 
         final var request = buildRequest(HttpMethod.GET, YANG_LIBRARY_VERSION_URI, encoding, null);
-        final var response = dispatch(request);
+        final var response = dispatchWithAlloc(request);
         assertResponse(response, HttpResponseStatus.OK, encoding.responseType, content);
     }
 
@@ -87,7 +87,7 @@ class ModulesRequestProcessorTest extends AbstractRequestProcessorTest {
 
         final var uri = MODULE_URI + (hasRevision ? REVISION_PARAM : "");
         final var request = buildRequest(HttpMethod.GET, uri, encoding, null);
-        final var response = dispatch(request);
+        final var response = dispatchWithAlloc(request);
         assertResponse(response, HttpResponseStatus.OK, encoding.responseType, content);
     }
 
@@ -105,7 +105,7 @@ class ModulesRequestProcessorTest extends AbstractRequestProcessorTest {
         }
         final var uri = MODULE_URI_WITH_MOUNT + (hasRevision ? REVISION_PARAM : "");
         final var request = buildRequest(HttpMethod.GET, uri, encoding, null);
-        final var response = dispatch(request);
+        final var response = dispatchWithAlloc(request);
         assertResponse(response, HttpResponseStatus.OK, encoding.responseType, content);
     }
 
@@ -142,7 +142,7 @@ class ModulesRequestProcessorTest extends AbstractRequestProcessorTest {
         }
 
         final var request = buildRequest(HttpMethod.GET, MODULE_URI, encoding, null);
-        final var response = dispatch(request);
+        final var response = dispatchWithAlloc(request);
         assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.status());
         assertEquals("source-read-failure", response.content().toString(StandardCharsets.UTF_8));
     }
index d4ef90b83ef85458652224cbb3d91d00e674fb91..221a786744582da5fb9692192a131931c5b7a267 100644 (file)
@@ -68,7 +68,7 @@ class OperationsRequestProcessorTest extends AbstractRequestProcessorTest {
         doAnswer(answerCompleteWith(result)).when(server).operationsGET(any());
 
         final var request = buildRequest(HttpMethod.GET, OPERATIONS_PATH, encoding, null);
-        final var response = dispatch(request);
+        final var response = dispatchWithAlloc(request);
         assertResponse(response, HttpResponseStatus.OK, encoding.responseType, content);
     }
 
@@ -79,7 +79,7 @@ class OperationsRequestProcessorTest extends AbstractRequestProcessorTest {
         doAnswer(answerCompleteWith(result)).when(server).operationsGET(any(), any(ApiPath.class));
 
         final var request = buildRequest(HttpMethod.GET, OPERATIONS_PATH_WITH_ID, encoding, null);
-        final var response = dispatch(request);
+        final var response = dispatchWithAlloc(request);
         verify(server).operationsGET(any(), apiPathCaptor.capture());
 
         assertEquals(API_PATH, apiPathCaptor.getValue());
@@ -95,7 +95,7 @@ class OperationsRequestProcessorTest extends AbstractRequestProcessorTest {
         doAnswer(answer).when(server).operationsPOST(any(), any(), any(ApiPath.class), any(OperationInputBody.class));
 
         final var request = buildRequest(HttpMethod.POST, OPERATIONS_PATH_WITH_ID, encoding, input);
-        final var response = dispatch(request);
+        final var response = output == null ? dispatch(request) : dispatchWithAlloc(request);
         verify(server).operationsPOST(any(), eq(RESTCONF_URI), apiPathCaptor.capture(), any());
 
         assertEquals(API_PATH, apiPathCaptor.getValue());
index 0fa97b93bd78dde8cd99c435e4fda8c88a64a348..45eedab670938d12563fa9c71661bfa518877427 100644 (file)
@@ -41,6 +41,9 @@ public abstract class ByteStreamRequestResponse extends AbstractRequestResponse
         final var content = alloc.buffer();
         try (var out = new ByteBufOutputStream(content)) {
             writeBody(out);
+        } catch (IOException e) {
+            content.release();
+            throw e;
         }
         return toHttpResponse(version, content);
     }
index 7ff7911b50a23bd39cc8ccaaf864956d507e3b06..2bb3c183a7abdbfc480a0c5620b2737141aeae4b 100644 (file)
@@ -60,15 +60,11 @@ public abstract class HTTPServerSession extends SimpleChannelInboundHandler<Full
      * @param version HTTP version of the request
      */
     @NonNullByDefault
-    public record RequestContext(ChannelHandlerContext ctx, HttpVersion version, @Nullable Integer streamId) {
-        public RequestContext {
+    private record RequestContext(ChannelHandlerContext ctx, HttpVersion version, @Nullable Integer streamId) {
+        RequestContext {
             requireNonNull(ctx);
             requireNonNull(version);
         }
-
-        public void respond(final FullHttpResponse response) {
-            HTTPServerSession.respond(ctx, streamId, response);
-        }
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(HTTPServerSession.class);
@@ -195,7 +191,7 @@ public abstract class HTTPServerSession extends SimpleChannelInboundHandler<Full
         switch (prepareRequest(method, targetUri, msg.headers())) {
             case CompletedRequest completed -> {
                 msg.release();
-                respond(ctx, streamId, formatResponse(completed.asResponse(), version));
+                respond(ctx, streamId, version, completed.asResponse());
             }
             case PendingRequest<?> pending -> {
                 LOG.debug("Dispatching {} {}", method, targetUri);
@@ -266,9 +262,9 @@ public abstract class HTTPServerSession extends SimpleChannelInboundHandler<Full
 
     @Override
     public void requestComplete(final PendingRequest<?> request, final Response response) {
-        final var context = executingRequests.remove(request);
-        if (context != null) {
-            context.respond(formatResponse(response, context.version()));
+        final var req = executingRequests.remove(request);
+        if (req != null) {
+            respond(req.ctx, req.streamId, req.version, response);
         } else {
             LOG.warn("Cannot pair request {}, not sending response {}", request, response, new Throwable());
         }
@@ -277,14 +273,33 @@ public abstract class HTTPServerSession extends SimpleChannelInboundHandler<Full
     @Override
     public void requestFailed(final PendingRequest<?> request, final Exception cause) {
         LOG.warn("Internal error while processing {}", request, cause);
-        final var context = executingRequests.remove(request);
-        if (context != null) {
-            context.respond(formatException(cause, context.version()));
+        final var req = executingRequests.remove(request);
+        if (req != null) {
+            respond(req.ctx, req.streamId, formatException(cause, req.version()));
         } else {
             LOG.warn("Cannot pair request, not sending response", new Throwable());
         }
     }
 
+    @NonNullByDefault
+    private static void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId,
+            final HttpVersion version, final Response response) {
+        respond(ctx, streamId, switch (response) {
+            case ReadyResponse ready -> ready.toHttpResponse(version);
+            default -> formatResponse(response, ctx, version);
+        });
+    }
+
+    @NonNullByDefault
+    private static void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId,
+            final FullHttpResponse response) {
+        requireNonNull(response);
+        if (streamId != null) {
+            response.headers().setInt(STREAM_ID, streamId);
+        }
+        ctx.writeAndFlush(response);
+    }
+
     // FIXME: below payloads use a synchronous dump of data into the socket. We cannot safely do that on the event loop,
     //        because a slow client would end up throttling our IO threads simply because of TCP window and similar
     //        queuing/backpressure things.
@@ -293,7 +308,7 @@ public abstract class HTTPServerSession extends SimpleChannelInboundHandler<Full
     //        thing, talking to a short queue (SPSC?) of HttpObjects.
     //
     //        the event loop of each channel would be the consumer of that queue, picking them off as quickly as
-    //        possible, but execting backpressure if the amount of pending stuff goes up.
+    //        possible, but expecting backpressure if the amount of pending stuff goes up.
     //
     //        as for the HttpObjects: this effectively means that the OutputStreams used in the below code should be
     //        replaced with entities which perform chunking:
@@ -304,10 +319,10 @@ public abstract class HTTPServerSession extends SimpleChannelInboundHandler<Full
     //        - finish up with a LastHttpContent
 
     @NonNullByDefault
-    private static FullHttpResponse formatResponse(final Response response, final HttpVersion version) {
+    private static FullHttpResponse formatResponse(final Response response, final ChannelHandlerContext ctx,
+            final HttpVersion version) {
         try {
-            // FIXME: require ChannelHandlerContext and use its allocator
-            return response.toHttpResponse(version);
+            return response.toHttpResponse(ctx.alloc(), version);
         } catch (IOException e) {
             LOG.warn("IO error while converting formatting response", e);
             return formatException(e, version);
@@ -324,14 +339,4 @@ public abstract class HTTPServerSession extends SimpleChannelInboundHandler<Full
         HttpUtil.setContentLength(response, content.readableBytes());
         return response;
     }
-
-    @NonNullByDefault
-    protected static final void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId,
-            final FullHttpResponse response) {
-        requireNonNull(response);
-        if (streamId != null) {
-            response.headers().setInt(STREAM_ID, streamId);
-        }
-        ctx.writeAndFlush(response);
-    }
 }