import static org.mockito.Mockito.verify;
import static org.opendaylight.restconf.server.TestUtils.ERROR_TAG_MAPPING;
+import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
return responseCaptor.getValue();
}
+ protected final FullHttpResponse dispatchWithAlloc(final FullHttpRequest request) {
+ doReturn(UnpooledByteBufAllocator.DEFAULT).when(ctx).alloc();
+ return dispatch(request);
+ }
+
protected static final List<Arguments> encodings() {
return List.of(
Arguments.of(TestEncoding.JSON, JSON_CONTENT),
doAnswer(answerCompleteWith(result)).when(server).dataGET(any());
final var request = buildRequest(HttpMethod.GET, DATA_PATH, encoding, null);
- final var response = dispatch(request);
+ final var response = dispatchWithAlloc(request);
assertResponse(response, HttpResponseStatus.OK, encoding.responseType, content);
assertResponseHeaders(response, Map.of(HttpHeaderNames.CACHE_CONTROL, HttpHeaderValues.NO_CACHE));
doAnswer(answerCompleteWith(result)).when(server).dataGET(any(), any(ApiPath.class));
final var request = buildRequest(HttpMethod.GET, DATA_PATH_WITH_ID, encoding, null);
- final var response = dispatch(request);
+ final var response = dispatchWithAlloc(request);
verify(server).dataGET(any(), apiPathCaptor.capture());
assertEquals(API_PATH, apiPathCaptor.getValue());
doAnswer(answer).when(server).dataPOST(any(), any(ApiPath.class), any(DataPostBody.class));
final var request = buildRequest(HttpMethod.POST, DATA_PATH_WITH_ID, encoding, content);
- final var response = dispatch(request);
+ final var response = dispatchWithAlloc(request);
verify(server).dataPOST(any(), apiPathCaptor.capture(), any());
assertEquals(API_PATH, apiPathCaptor.getValue());
doAnswer(answer).when(server).dataPATCH(any(), any(PatchBody.class));
final var request = buildRequest(HttpMethod.PATCH, DATA_PATH, encoding, input);
- final var response = dispatch(request);
+ final var response = dispatchWithAlloc(request);
answer.assertContent(input);
final var expectedStatus = expectedErrorTag == null ? HttpResponseStatus.OK
}
final var request = buildRequest(HttpMethod.GET, DATA_PATH, encoding, null);
- final var response = dispatch(request);
+ final var response = dispatchWithAlloc(request);
assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.status());
assertEquals("encoding-error", response.content().toString(StandardCharsets.UTF_8));
}
doAnswer(answerCompleteWith(result)).when(server).yangLibraryVersionGET(any());
final var request = buildRequest(HttpMethod.GET, YANG_LIBRARY_VERSION_URI, encoding, null);
- final var response = dispatch(request);
+ final var response = dispatchWithAlloc(request);
assertResponse(response, HttpResponseStatus.OK, encoding.responseType, content);
}
final var uri = MODULE_URI + (hasRevision ? REVISION_PARAM : "");
final var request = buildRequest(HttpMethod.GET, uri, encoding, null);
- final var response = dispatch(request);
+ final var response = dispatchWithAlloc(request);
assertResponse(response, HttpResponseStatus.OK, encoding.responseType, content);
}
}
final var uri = MODULE_URI_WITH_MOUNT + (hasRevision ? REVISION_PARAM : "");
final var request = buildRequest(HttpMethod.GET, uri, encoding, null);
- final var response = dispatch(request);
+ final var response = dispatchWithAlloc(request);
assertResponse(response, HttpResponseStatus.OK, encoding.responseType, content);
}
}
final var request = buildRequest(HttpMethod.GET, MODULE_URI, encoding, null);
- final var response = dispatch(request);
+ final var response = dispatchWithAlloc(request);
assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.status());
assertEquals("source-read-failure", response.content().toString(StandardCharsets.UTF_8));
}
doAnswer(answerCompleteWith(result)).when(server).operationsGET(any());
final var request = buildRequest(HttpMethod.GET, OPERATIONS_PATH, encoding, null);
- final var response = dispatch(request);
+ final var response = dispatchWithAlloc(request);
assertResponse(response, HttpResponseStatus.OK, encoding.responseType, content);
}
doAnswer(answerCompleteWith(result)).when(server).operationsGET(any(), any(ApiPath.class));
final var request = buildRequest(HttpMethod.GET, OPERATIONS_PATH_WITH_ID, encoding, null);
- final var response = dispatch(request);
+ final var response = dispatchWithAlloc(request);
verify(server).operationsGET(any(), apiPathCaptor.capture());
assertEquals(API_PATH, apiPathCaptor.getValue());
doAnswer(answer).when(server).operationsPOST(any(), any(), any(ApiPath.class), any(OperationInputBody.class));
final var request = buildRequest(HttpMethod.POST, OPERATIONS_PATH_WITH_ID, encoding, input);
- final var response = dispatch(request);
+ final var response = output == null ? dispatch(request) : dispatchWithAlloc(request);
verify(server).operationsPOST(any(), eq(RESTCONF_URI), apiPathCaptor.capture(), any());
assertEquals(API_PATH, apiPathCaptor.getValue());
final var content = alloc.buffer();
try (var out = new ByteBufOutputStream(content)) {
writeBody(out);
+ } catch (IOException e) {
+ content.release();
+ throw e;
}
return toHttpResponse(version, content);
}
* @param version HTTP version of the request
*/
@NonNullByDefault
- public record RequestContext(ChannelHandlerContext ctx, HttpVersion version, @Nullable Integer streamId) {
- public RequestContext {
+ private record RequestContext(ChannelHandlerContext ctx, HttpVersion version, @Nullable Integer streamId) {
+ RequestContext {
requireNonNull(ctx);
requireNonNull(version);
}
-
- public void respond(final FullHttpResponse response) {
- HTTPServerSession.respond(ctx, streamId, response);
- }
}
private static final Logger LOG = LoggerFactory.getLogger(HTTPServerSession.class);
switch (prepareRequest(method, targetUri, msg.headers())) {
case CompletedRequest completed -> {
msg.release();
- respond(ctx, streamId, formatResponse(completed.asResponse(), version));
+ respond(ctx, streamId, version, completed.asResponse());
}
case PendingRequest<?> pending -> {
LOG.debug("Dispatching {} {}", method, targetUri);
@Override
public void requestComplete(final PendingRequest<?> request, final Response response) {
- final var context = executingRequests.remove(request);
- if (context != null) {
- context.respond(formatResponse(response, context.version()));
+ final var req = executingRequests.remove(request);
+ if (req != null) {
+ respond(req.ctx, req.streamId, req.version, response);
} else {
LOG.warn("Cannot pair request {}, not sending response {}", request, response, new Throwable());
}
@Override
public void requestFailed(final PendingRequest<?> request, final Exception cause) {
LOG.warn("Internal error while processing {}", request, cause);
- final var context = executingRequests.remove(request);
- if (context != null) {
- context.respond(formatException(cause, context.version()));
+ final var req = executingRequests.remove(request);
+ if (req != null) {
+ respond(req.ctx, req.streamId, formatException(cause, req.version()));
} else {
LOG.warn("Cannot pair request, not sending response", new Throwable());
}
}
+ @NonNullByDefault
+ private static void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId,
+ final HttpVersion version, final Response response) {
+ respond(ctx, streamId, switch (response) {
+ case ReadyResponse ready -> ready.toHttpResponse(version);
+ default -> formatResponse(response, ctx, version);
+ });
+ }
+
+ @NonNullByDefault
+ private static void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId,
+ final FullHttpResponse response) {
+ requireNonNull(response);
+ if (streamId != null) {
+ response.headers().setInt(STREAM_ID, streamId);
+ }
+ ctx.writeAndFlush(response);
+ }
+
// FIXME: below payloads use a synchronous dump of data into the socket. We cannot safely do that on the event loop,
// because a slow client would end up throttling our IO threads simply because of TCP window and similar
// queuing/backpressure things.
// thing, talking to a short queue (SPSC?) of HttpObjects.
//
// the event loop of each channel would be the consumer of that queue, picking them off as quickly as
- // possible, but execting backpressure if the amount of pending stuff goes up.
+ // possible, but expecting backpressure if the amount of pending stuff goes up.
//
// as for the HttpObjects: this effectively means that the OutputStreams used in the below code should be
// replaced with entities which perform chunking:
// - finish up with a LastHttpContent
@NonNullByDefault
- private static FullHttpResponse formatResponse(final Response response, final HttpVersion version) {
+ private static FullHttpResponse formatResponse(final Response response, final ChannelHandlerContext ctx,
+ final HttpVersion version) {
try {
- // FIXME: require ChannelHandlerContext and use its allocator
- return response.toHttpResponse(version);
+ return response.toHttpResponse(ctx.alloc(), version);
} catch (IOException e) {
LOG.warn("IO error while converting formatting response", e);
return formatException(e, version);
HttpUtil.setContentLength(response, content.readableBytes());
return response;
}
-
- @NonNullByDefault
- protected static final void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId,
- final FullHttpResponse response) {
- requireNonNull(response);
- if (streamId != null) {
- response.headers().setInt(STREAM_ID, streamId);
- }
- ctx.writeAndFlush(response);
- }
}