Refactor transport-http response delivery 67/111267/2
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 4 Apr 2024 14:16:57 +0000 (16:16 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 4 Apr 2024 14:21:45 +0000 (16:21 +0200)
Using a ListenableFuture brings in lifecycle headache, as the reported
value is retained indefinitely -- forcing us to make copies of the
response.

Rework the APIs in terms of FutureCallback instead -- i.e. we report the
response while having a reference to it and the callback can retain it
if need be (in which case it takes ownership of it).

Also introduce ClientRequestDispatcher to eliminate a bit of duplicate
code.

Change-Id: I350467582b5f5bd67d5241d47a5d4cbaef19adaa
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp1RequestDispatcher.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp2RequestDispatcher.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientRequestDispatcher.java [new file with mode: 0644]
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/HTTPClient.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/RequestDispatcher.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ServerChannelInitializer.java
transport/transport-http/src/test/java/org/opendaylight/netconf/transport/http/HttpClientServerTest.java

index e6a59f9d4f9ef5c542a93b796c6ed9ebd8f58049..5188f49a51b57813c208db03d5ed75229acee489 100644 (file)
@@ -7,11 +7,9 @@
  */
 package org.opendaylight.netconf.transport.http;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.FutureCallback;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
 import java.util.Queue;
@@ -26,54 +24,32 @@ import org.slf4j.LoggerFactory;
  * Serves as gateway to Netty {@link Channel}, performs sending requests to server, returns server responses associated.
  * Uses request to response mapping via queue -- first accepted response is associated with first request sent.
  */
-class ClientHttp1RequestDispatcher extends SimpleChannelInboundHandler<FullHttpResponse> implements RequestDispatcher {
+final class ClientHttp1RequestDispatcher extends ClientRequestDispatcher {
     private static final Logger LOG = LoggerFactory.getLogger(ClientHttp1RequestDispatcher.class);
 
-    private final Queue<SettableFuture<FullHttpResponse>> queue = new ConcurrentLinkedQueue<>();
-    private Channel channel = null;
-
-    ClientHttp1RequestDispatcher() {
-        super(true); // auto-release
-    }
-
-    @Override
-    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
-        channel = ctx.channel();
-        super.handlerAdded(ctx);
-    }
+    // TODO: we access the queue only from Netty callbacks: can we use a plain ArrayDeque?
+    private final Queue<FutureCallback<FullHttpResponse>> queue = new ConcurrentLinkedQueue<>();
 
     @Override
-    public ListenableFuture<FullHttpResponse> dispatch(final FullHttpRequest request) {
-        if (channel == null) {
-            throw new IllegalStateException("Connection is not established yet");
-        }
-        final var future = SettableFuture.<FullHttpResponse>create();
+    public void dispatch(final Channel channel, final FullHttpRequest request,
+            final FutureCallback<FullHttpResponse> callback) {
         channel.writeAndFlush(request).addListener(sent -> {
             final var cause = sent.cause();
             if (cause == null) {
-                queue.add(future);
+                queue.add(callback);
             } else {
-                future.setException(cause);
+                callback.onFailure(cause);
             }
         });
-        return future;
     }
 
     @Override
     protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpResponse response) {
-        final var future = queue.poll();
-        if (future == null) {
-            LOG.warn("Unexpected response while no future associated -- Dropping response object {}", response);
-            return;
-        }
-
-        if (!future.isDone()) {
-            // NB using response' copy to disconnect the content data from channel's buffer allocated.
-            // this prevents the content data became inaccessible once byte buffer of original message is released
-            // on exit of current method
-            future.set(response.copy());
+        final var callback = queue.poll();
+        if (callback != null) {
+            callback.onSuccess(response);
         } else {
-            LOG.warn("Future is already in Done state -- Dropping response object {}", response);
+            LOG.warn("Unexpected response while no future associated -- Dropping response object {}", response);
         }
     }
 }
index 3d5089eb531729b559fab520f38cd07ef23e93f0..6089dbbe3620cd7806e8eb1399c7523f4258e643 100644 (file)
@@ -10,11 +10,9 @@ package org.opendaylight.netconf.transport.http;
 import static io.netty.handler.codec.http2.HttpConversionUtil.ExtensionHeaderNames.SCHEME;
 import static io.netty.handler.codec.http2.HttpConversionUtil.ExtensionHeaderNames.STREAM_ID;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.FutureCallback;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpScheme;
@@ -32,50 +30,39 @@ import org.slf4j.LoggerFactory;
  * Serves as gateway to Netty {@link Channel}, performs sending requests to server, returns server responses associated.
  * Uses request to response mapping by stream identifier.
  */
-class ClientHttp2RequestDispatcher extends SimpleChannelInboundHandler<FullHttpResponse> implements RequestDispatcher {
+final class ClientHttp2RequestDispatcher extends ClientRequestDispatcher {
     private static final Logger LOG = LoggerFactory.getLogger(ClientHttp2RequestDispatcher.class);
 
-    private final Map<Integer, SettableFuture<FullHttpResponse>> map = new ConcurrentHashMap<>();
+    // TODO: we access the queue only from Netty callbacks: can we use a plain HashMap?
+    private final Map<Integer, FutureCallback<FullHttpResponse>> map = new ConcurrentHashMap<>();
+    // identifier for streams initiated from client require to be odd-numbered, 1 is reserved
+    // see https://datatracker.ietf.org/doc/html/rfc7540#section-5.1.1
     private final AtomicInteger streamIdCounter = new AtomicInteger(3);
 
-    private Channel channel = null;
     private boolean ssl = false;
 
-    ClientHttp2RequestDispatcher() {
-        super(true); // auto-release
-    }
-
-    private Integer nextStreamId() {
-        // identifier for streams initiated from client require to be odd-numbered, 1 is reserved
-        // see https://datatracker.ietf.org/doc/html/rfc7540#section-5.1.1
-        return streamIdCounter.getAndAdd(2);
-    }
-
     @Override
     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
-        channel = ctx.channel();
         ssl = ctx.pipeline().get(SslHandler.class) != null;
         super.handlerAdded(ctx);
     }
 
     @Override
-    public ListenableFuture<FullHttpResponse> dispatch(final FullHttpRequest request) {
-        if (channel == null) {
-            throw new IllegalStateException("Connection is not established yet");
-        }
-        final var streamId = nextStreamId();
-        request.headers().setInt(STREAM_ID.text(), streamId);
-        request.headers().set(SCHEME.text(), ssl ? HttpScheme.HTTPS.name() : HttpScheme.HTTP.name());
+    public void dispatch(final Channel channel, final FullHttpRequest request,
+            final FutureCallback<FullHttpResponse> callback) {
+        final var streamId = streamIdCounter.getAndAdd(2);
+        request.headers()
+            .setInt(STREAM_ID.text(), streamId)
+            .set(SCHEME.text(), ssl ? HttpScheme.HTTPS.name() : HttpScheme.HTTP.name());
 
-        final var future = SettableFuture.<FullHttpResponse>create();
         channel.writeAndFlush(request).addListener(sent -> {
-            if (sent.cause() == null) {
-                map.put(streamId, future);
+            final var cause = sent.cause();
+            if (cause == null) {
+                map.put(streamId, callback);
             } else {
-                future.setException(sent.cause());
+                callback.onFailure(cause);
             }
         });
-        return future;
     }
 
     @Override
@@ -85,19 +72,12 @@ class ClientHttp2RequestDispatcher extends SimpleChannelInboundHandler<FullHttpR
             LOG.warn("Unexpected response with no stream ID -- Dropping response object {}", response);
             return;
         }
-        final var future = map.remove(streamId);
-        if (future == null) {
+        final var callback = map.remove(streamId);
+        if (callback != null) {
+            callback.onSuccess(response);
+        } else {
             LOG.warn("Unexpected response with unknown or expired stream ID {} -- Dropping response object {}",
                 streamId, response);
-            return;
-        }
-        if (!future.isDone()) {
-            // NB using response' copy to disconnect the content data from channel's buffer allocated.
-            // this prevents the content data became inaccessible once byte buffer of original message is released
-            // on exit of current method
-            future.set(response.copy());
-        } else {
-            LOG.warn("Future is already in Done state -- Dropping response object {}", response);
         }
     }
 }
diff --git a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientRequestDispatcher.java b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientRequestDispatcher.java
new file mode 100644 (file)
index 0000000..f6ae88d
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.http;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.util.concurrent.FutureCallback;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import org.eclipse.jdt.annotation.NonNull;
+
+abstract class ClientRequestDispatcher extends SimpleChannelInboundHandler<FullHttpResponse>
+        implements RequestDispatcher {
+    private Channel channel = null;
+
+    @Override
+    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
+        channel = ctx.channel();
+        super.handlerAdded(ctx);
+    }
+
+    @Override
+    public final void dispatch(final FullHttpRequest request, final FutureCallback<FullHttpResponse> callback) {
+        final var local = channel;
+        if (local != null) {
+            dispatch(local, requireNonNull(request), requireNonNull(callback));
+        } else {
+            throw new IllegalStateException("Connection is not established yet");
+        }
+    }
+
+    abstract void dispatch(@NonNull Channel channel, @NonNull FullHttpRequest request,
+        @NonNull FutureCallback<FullHttpResponse> callback);
+}
index 45a3a36190c62512a071ce4cc39935a5cbd41acc..fa1c597ea382f4b281c41c6903a65f2bbc95c5d5 100644 (file)
@@ -9,10 +9,12 @@ package org.opendaylight.netconf.transport.http;
 
 import static java.util.Objects.requireNonNull;
 
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.netconf.transport.api.TransportChannelListener;
 import org.opendaylight.netconf.transport.api.UnsupportedConfigurationException;
 import org.opendaylight.netconf.transport.tcp.TCPClient;
@@ -28,7 +30,6 @@ import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.tls.client.
  * A {@link HTTPTransportStack} acting as a client.
  */
 public final class HTTPClient extends HTTPTransportStack {
-
     private final RequestDispatcher dispatcher;
 
     private HTTPClient(final TransportChannelListener listener, final HttpChannelInitializer channelInitializer,
@@ -41,10 +42,11 @@ public final class HTTPClient extends HTTPTransportStack {
      * Invokes the HTTP request over established connection.
      *
      * @param request the full http request object
-     * @return a future providing full http response or cause in case of error
+     * @param callback invoked when the request completes
      */
-    public ListenableFuture<FullHttpResponse> invoke(final FullHttpRequest request) {
-        return dispatcher.dispatch(requireNonNull(request));
+    public void invoke(final @NonNull FullHttpRequest request,
+            final @NonNull FutureCallback<@NonNull FullHttpResponse> callback) {
+        dispatcher.dispatch(request, callback);
     }
 
     /**
index 7373f0c48d79e36de66f1553d26f96dc92ab8608..918f2b3f6fa3bd90b63d9a8867d0d133cedf3907 100644 (file)
@@ -7,22 +7,23 @@
  */
 package org.opendaylight.netconf.transport.http;
 
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.FutureCallback;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
+import org.eclipse.jdt.annotation.NonNullByDefault;
 
 /**
  * Functional interface for HTTP request dispatcher.
  */
+@NonNullByDefault
 @FunctionalInterface
 public interface RequestDispatcher {
-
     /**
      * Performs {@link FullHttpRequest} processing. Any error occurred is expected either to be returned within
-     * {@link FullHttpResponse} with appropriate HTTP status code or set as future cause.
+     * {@link FullHttpResponse} with appropriate HTTP status code or set as future cause. Note that
      *
-     * @param request http request
-     * @return future providing http response or cause in case of error.
+     * @param request HTTP request
+     * @param callback invoked when the request completes
      */
-    ListenableFuture<FullHttpResponse> dispatch(FullHttpRequest request);
+    void dispatch(FullHttpRequest request, FutureCallback<FullHttpResponse> callback);
 }
index 0ce9d516232cd1adf29cf1eef9c264940962e36b..c791873f05ca07f7629d3dee942cdcfae97a2b6c 100644 (file)
@@ -15,9 +15,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERR
 import static org.opendaylight.netconf.transport.http.Http2Utils.copyStreamId;
 
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
@@ -57,7 +55,6 @@ class ServerChannelInitializer extends ChannelInitializer<Channel> implements Ht
     private final RequestDispatcher dispatcher;
 
     ServerChannelInitializer(final HttpServerGrouping httpParams, final RequestDispatcher dispatcher) {
-        super();
         authHandler = BasicAuthHandler.ofNullable(httpParams);
         this.dispatcher = dispatcher;
     }
@@ -158,7 +155,7 @@ class ServerChannelInitializer extends ChannelInitializer<Channel> implements Ht
         return new SimpleChannelInboundHandler<FullHttpRequest>() {
             @Override
             protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest request) {
-                Futures.addCallback(dispatcher.dispatch(request.retain()), new FutureCallback<>() {
+                dispatcher.dispatch(request.retain(), new FutureCallback<>() {
                     @Override
                     public void onSuccess(final FullHttpResponse response) {
                         copyStreamId(request, response);
@@ -178,7 +175,7 @@ class ServerChannelInitializer extends ChannelInitializer<Channel> implements Ht
                             .setInt(CONTENT_LENGTH, response.content().readableBytes());
                         onSuccess(response);
                     }
-                }, MoreExecutors.directExecutor());
+                });
             }
         };
     }
index cc600b2e7aa26fe2e37aa9557cc63a0e77975690..ba9bcaa9b5447a6a22e49d17b84d229391abcd34 100644 (file)
@@ -26,6 +26,7 @@ import static org.opendaylight.netconf.transport.http.ConfigUtils.clientTranspor
 import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTcp;
 import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTls;
 
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.SettableFuture;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -99,8 +100,7 @@ public class HttpClientServerTest {
         localAddress = InetAddress.getLoopbackAddress().getHostAddress();
         scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
 
-        requestDispatcher = request -> {
-            final var future = SettableFuture.<FullHttpResponse>create();
+        requestDispatcher = (request, callback) -> {
             // emulate asynchronous server request processing - run in separate thread with 100 millis delay
             scheduledExecutor.schedule(() -> {
                 // return 200 response with a content built from request parameters
@@ -113,9 +113,8 @@ public class HttpClientServerTest {
                     wrappedBuffer(responseMessage.getBytes(StandardCharsets.UTF_8)));
                 response.headers().set(CONTENT_TYPE, TEXT_PLAIN)
                     .setInt(CONTENT_LENGTH, response.content().readableBytes());
-                return future.set(response);
+                callback.onSuccess(response);
             }, 100, TimeUnit.MILLISECONDS);
-            return future;
         };
     }
 
@@ -188,7 +187,20 @@ public class HttpClientServerTest {
                         // allow multiple requests on same connections
                         .set(CONNECTION, KEEP_ALIVE);
 
-                    final var response = client.invoke(request).get(2, TimeUnit.SECONDS);
+                    final var future = SettableFuture.<FullHttpResponse>create();
+                    client.invoke(request, new FutureCallback<>() {
+                        @Override
+                        public void onSuccess(final FullHttpResponse result) {
+                            future.set(result.copy());
+                        }
+
+                        @Override
+                        public void onFailure(final Throwable cause) {
+                            future.setException(cause);
+                        }
+                    });
+
+                    final var response = future.get(2, TimeUnit.SECONDS);
                     assertNotNull(response);
                     assertEquals(OK, response.status());
                     final var expected = RESPONSE_TEMPLATE.formatted(method, uri, payload);