From: Robert Varga Date: Thu, 4 Apr 2024 14:16:57 +0000 (+0200) Subject: Refactor transport-http response delivery X-Git-Tag: v7.0.5~72 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F67%2F111267%2F2;p=netconf.git Refactor transport-http response delivery Using a ListenableFuture brings in lifecycle headache, as the reported value is retained indefinitely -- forcing us to make copies of the response. Rework the APIs in terms of FutureCallback instead -- i.e. we report the response while having a reference to it and the callback can retain it if need be (in which case it takes ownership of it). Also introduce ClientRequestDispatcher to eliminate a bit of duplicate code. Change-Id: I350467582b5f5bd67d5241d47a5d4cbaef19adaa Signed-off-by: Robert Varga --- diff --git a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp1RequestDispatcher.java b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp1RequestDispatcher.java index e6a59f9d4f..5188f49a51 100644 --- a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp1RequestDispatcher.java +++ b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp1RequestDispatcher.java @@ -7,11 +7,9 @@ */ package org.opendaylight.netconf.transport.http; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.FutureCallback; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import java.util.Queue; @@ -26,54 +24,32 @@ import org.slf4j.LoggerFactory; * Serves as gateway to Netty {@link Channel}, performs sending requests to server, returns server responses associated. * Uses request to response mapping via queue -- first accepted response is associated with first request sent. */ -class ClientHttp1RequestDispatcher extends SimpleChannelInboundHandler implements RequestDispatcher { +final class ClientHttp1RequestDispatcher extends ClientRequestDispatcher { private static final Logger LOG = LoggerFactory.getLogger(ClientHttp1RequestDispatcher.class); - private final Queue> queue = new ConcurrentLinkedQueue<>(); - private Channel channel = null; - - ClientHttp1RequestDispatcher() { - super(true); // auto-release - } - - @Override - public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { - channel = ctx.channel(); - super.handlerAdded(ctx); - } + // TODO: we access the queue only from Netty callbacks: can we use a plain ArrayDeque? + private final Queue> queue = new ConcurrentLinkedQueue<>(); @Override - public ListenableFuture dispatch(final FullHttpRequest request) { - if (channel == null) { - throw new IllegalStateException("Connection is not established yet"); - } - final var future = SettableFuture.create(); + public void dispatch(final Channel channel, final FullHttpRequest request, + final FutureCallback callback) { channel.writeAndFlush(request).addListener(sent -> { final var cause = sent.cause(); if (cause == null) { - queue.add(future); + queue.add(callback); } else { - future.setException(cause); + callback.onFailure(cause); } }); - return future; } @Override protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpResponse response) { - final var future = queue.poll(); - if (future == null) { - LOG.warn("Unexpected response while no future associated -- Dropping response object {}", response); - return; - } - - if (!future.isDone()) { - // NB using response' copy to disconnect the content data from channel's buffer allocated. - // this prevents the content data became inaccessible once byte buffer of original message is released - // on exit of current method - future.set(response.copy()); + final var callback = queue.poll(); + if (callback != null) { + callback.onSuccess(response); } else { - LOG.warn("Future is already in Done state -- Dropping response object {}", response); + LOG.warn("Unexpected response while no future associated -- Dropping response object {}", response); } } } diff --git a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp2RequestDispatcher.java b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp2RequestDispatcher.java index 3d5089eb53..6089dbbe36 100644 --- a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp2RequestDispatcher.java +++ b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp2RequestDispatcher.java @@ -10,11 +10,9 @@ package org.opendaylight.netconf.transport.http; import static io.netty.handler.codec.http2.HttpConversionUtil.ExtensionHeaderNames.SCHEME; import static io.netty.handler.codec.http2.HttpConversionUtil.ExtensionHeaderNames.STREAM_ID; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.FutureCallback; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpScheme; @@ -32,50 +30,39 @@ import org.slf4j.LoggerFactory; * Serves as gateway to Netty {@link Channel}, performs sending requests to server, returns server responses associated. * Uses request to response mapping by stream identifier. */ -class ClientHttp2RequestDispatcher extends SimpleChannelInboundHandler implements RequestDispatcher { +final class ClientHttp2RequestDispatcher extends ClientRequestDispatcher { private static final Logger LOG = LoggerFactory.getLogger(ClientHttp2RequestDispatcher.class); - private final Map> map = new ConcurrentHashMap<>(); + // TODO: we access the queue only from Netty callbacks: can we use a plain HashMap? + private final Map> map = new ConcurrentHashMap<>(); + // identifier for streams initiated from client require to be odd-numbered, 1 is reserved + // see https://datatracker.ietf.org/doc/html/rfc7540#section-5.1.1 private final AtomicInteger streamIdCounter = new AtomicInteger(3); - private Channel channel = null; private boolean ssl = false; - ClientHttp2RequestDispatcher() { - super(true); // auto-release - } - - private Integer nextStreamId() { - // identifier for streams initiated from client require to be odd-numbered, 1 is reserved - // see https://datatracker.ietf.org/doc/html/rfc7540#section-5.1.1 - return streamIdCounter.getAndAdd(2); - } - @Override public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { - channel = ctx.channel(); ssl = ctx.pipeline().get(SslHandler.class) != null; super.handlerAdded(ctx); } @Override - public ListenableFuture dispatch(final FullHttpRequest request) { - if (channel == null) { - throw new IllegalStateException("Connection is not established yet"); - } - final var streamId = nextStreamId(); - request.headers().setInt(STREAM_ID.text(), streamId); - request.headers().set(SCHEME.text(), ssl ? HttpScheme.HTTPS.name() : HttpScheme.HTTP.name()); + public void dispatch(final Channel channel, final FullHttpRequest request, + final FutureCallback callback) { + final var streamId = streamIdCounter.getAndAdd(2); + request.headers() + .setInt(STREAM_ID.text(), streamId) + .set(SCHEME.text(), ssl ? HttpScheme.HTTPS.name() : HttpScheme.HTTP.name()); - final var future = SettableFuture.create(); channel.writeAndFlush(request).addListener(sent -> { - if (sent.cause() == null) { - map.put(streamId, future); + final var cause = sent.cause(); + if (cause == null) { + map.put(streamId, callback); } else { - future.setException(sent.cause()); + callback.onFailure(cause); } }); - return future; } @Override @@ -85,19 +72,12 @@ class ClientHttp2RequestDispatcher extends SimpleChannelInboundHandler + implements RequestDispatcher { + private Channel channel = null; + + @Override + public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { + channel = ctx.channel(); + super.handlerAdded(ctx); + } + + @Override + public final void dispatch(final FullHttpRequest request, final FutureCallback callback) { + final var local = channel; + if (local != null) { + dispatch(local, requireNonNull(request), requireNonNull(callback)); + } else { + throw new IllegalStateException("Connection is not established yet"); + } + } + + abstract void dispatch(@NonNull Channel channel, @NonNull FullHttpRequest request, + @NonNull FutureCallback callback); +} diff --git a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/HTTPClient.java b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/HTTPClient.java index 45a3a36190..fa1c597ea3 100644 --- a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/HTTPClient.java +++ b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/HTTPClient.java @@ -9,10 +9,12 @@ package org.opendaylight.netconf.transport.http; import static java.util.Objects.requireNonNull; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; import io.netty.bootstrap.Bootstrap; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.netconf.transport.api.TransportChannelListener; import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException; import org.opendaylight.netconf.transport.tcp.TCPClient; @@ -28,7 +30,6 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tls.client. * A {@link HTTPTransportStack} acting as a client. */ public final class HTTPClient extends HTTPTransportStack { - private final RequestDispatcher dispatcher; private HTTPClient(final TransportChannelListener listener, final HttpChannelInitializer channelInitializer, @@ -41,10 +42,11 @@ public final class HTTPClient extends HTTPTransportStack { * Invokes the HTTP request over established connection. * * @param request the full http request object - * @return a future providing full http response or cause in case of error + * @param callback invoked when the request completes */ - public ListenableFuture invoke(final FullHttpRequest request) { - return dispatcher.dispatch(requireNonNull(request)); + public void invoke(final @NonNull FullHttpRequest request, + final @NonNull FutureCallback<@NonNull FullHttpResponse> callback) { + dispatcher.dispatch(request, callback); } /** diff --git a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/RequestDispatcher.java b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/RequestDispatcher.java index 7373f0c48d..918f2b3f6f 100644 --- a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/RequestDispatcher.java +++ b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/RequestDispatcher.java @@ -7,22 +7,23 @@ */ package org.opendaylight.netconf.transport.http; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.FutureCallback; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; +import org.eclipse.jdt.annotation.NonNullByDefault; /** * Functional interface for HTTP request dispatcher. */ +@NonNullByDefault @FunctionalInterface public interface RequestDispatcher { - /** * Performs {@link FullHttpRequest} processing. Any error occurred is expected either to be returned within - * {@link FullHttpResponse} with appropriate HTTP status code or set as future cause. + * {@link FullHttpResponse} with appropriate HTTP status code or set as future cause. Note that * - * @param request http request - * @return future providing http response or cause in case of error. + * @param request HTTP request + * @param callback invoked when the request completes */ - ListenableFuture dispatch(FullHttpRequest request); + void dispatch(FullHttpRequest request, FutureCallback callback); } diff --git a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ServerChannelInitializer.java b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ServerChannelInitializer.java index 0ce9d51623..c791873f05 100644 --- a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ServerChannelInitializer.java +++ b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ServerChannelInitializer.java @@ -15,9 +15,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERR import static org.opendaylight.netconf.transport.http.Http2Utils.copyStreamId; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -57,7 +55,6 @@ class ServerChannelInitializer extends ChannelInitializer implements Ht private final RequestDispatcher dispatcher; ServerChannelInitializer(final HttpServerGrouping httpParams, final RequestDispatcher dispatcher) { - super(); authHandler = BasicAuthHandler.ofNullable(httpParams); this.dispatcher = dispatcher; } @@ -158,7 +155,7 @@ class ServerChannelInitializer extends ChannelInitializer implements Ht return new SimpleChannelInboundHandler() { @Override protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest request) { - Futures.addCallback(dispatcher.dispatch(request.retain()), new FutureCallback<>() { + dispatcher.dispatch(request.retain(), new FutureCallback<>() { @Override public void onSuccess(final FullHttpResponse response) { copyStreamId(request, response); @@ -178,7 +175,7 @@ class ServerChannelInitializer extends ChannelInitializer implements Ht .setInt(CONTENT_LENGTH, response.content().readableBytes()); onSuccess(response); } - }, MoreExecutors.directExecutor()); + }); } }; } diff --git a/transport/transport-http/src/test/java/org/opendaylight/netconf/transport/http/HttpClientServerTest.java b/transport/transport-http/src/test/java/org/opendaylight/netconf/transport/http/HttpClientServerTest.java index cc600b2e7a..ba9bcaa9b5 100644 --- a/transport/transport-http/src/test/java/org/opendaylight/netconf/transport/http/HttpClientServerTest.java +++ b/transport/transport-http/src/test/java/org/opendaylight/netconf/transport/http/HttpClientServerTest.java @@ -26,6 +26,7 @@ import static org.opendaylight.netconf.transport.http.ConfigUtils.clientTranspor import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTcp; import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTls; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.SettableFuture; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpResponse; @@ -99,8 +100,7 @@ public class HttpClientServerTest { localAddress = InetAddress.getLoopbackAddress().getHostAddress(); scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); - requestDispatcher = request -> { - final var future = SettableFuture.create(); + requestDispatcher = (request, callback) -> { // emulate asynchronous server request processing - run in separate thread with 100 millis delay scheduledExecutor.schedule(() -> { // return 200 response with a content built from request parameters @@ -113,9 +113,8 @@ public class HttpClientServerTest { wrappedBuffer(responseMessage.getBytes(StandardCharsets.UTF_8))); response.headers().set(CONTENT_TYPE, TEXT_PLAIN) .setInt(CONTENT_LENGTH, response.content().readableBytes()); - return future.set(response); + callback.onSuccess(response); }, 100, TimeUnit.MILLISECONDS); - return future; }; } @@ -188,7 +187,20 @@ public class HttpClientServerTest { // allow multiple requests on same connections .set(CONNECTION, KEEP_ALIVE); - final var response = client.invoke(request).get(2, TimeUnit.SECONDS); + final var future = SettableFuture.create(); + client.invoke(request, new FutureCallback<>() { + @Override + public void onSuccess(final FullHttpResponse result) { + future.set(result.copy()); + } + + @Override + public void onFailure(final Throwable cause) { + future.setException(cause); + } + }); + + final var response = future.get(2, TimeUnit.SECONDS); assertNotNull(response); assertEquals(OK, response.status()); final var expected = RESPONSE_TEMPLATE.formatted(method, uri, payload);