Refactor transport-http response delivery
[netconf.git] / transport / transport-http / src / main / java / org / opendaylight / netconf / transport / http / ClientHttp1RequestDispatcher.java
index e6a59f9d4f9ef5c542a93b796c6ed9ebd8f58049..5188f49a51b57813c208db03d5ed75229acee489 100644 (file)
@@ -7,11 +7,9 @@
  */
 package org.opendaylight.netconf.transport.http;
 
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.FutureCallback;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
 import java.util.Queue;
@@ -26,54 +24,32 @@ import org.slf4j.LoggerFactory;
  * Serves as gateway to Netty {@link Channel}, performs sending requests to server, returns server responses associated.
  * Uses request to response mapping via queue -- first accepted response is associated with first request sent.
  */
-class ClientHttp1RequestDispatcher extends SimpleChannelInboundHandler<FullHttpResponse> implements RequestDispatcher {
+final class ClientHttp1RequestDispatcher extends ClientRequestDispatcher {
     private static final Logger LOG = LoggerFactory.getLogger(ClientHttp1RequestDispatcher.class);
 
-    private final Queue<SettableFuture<FullHttpResponse>> queue = new ConcurrentLinkedQueue<>();
-    private Channel channel = null;
-
-    ClientHttp1RequestDispatcher() {
-        super(true); // auto-release
-    }
-
-    @Override
-    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
-        channel = ctx.channel();
-        super.handlerAdded(ctx);
-    }
+    // TODO: we access the queue only from Netty callbacks: can we use a plain ArrayDeque?
+    private final Queue<FutureCallback<FullHttpResponse>> queue = new ConcurrentLinkedQueue<>();
 
     @Override
-    public ListenableFuture<FullHttpResponse> dispatch(final FullHttpRequest request) {
-        if (channel == null) {
-            throw new IllegalStateException("Connection is not established yet");
-        }
-        final var future = SettableFuture.<FullHttpResponse>create();
+    public void dispatch(final Channel channel, final FullHttpRequest request,
+            final FutureCallback<FullHttpResponse> callback) {
         channel.writeAndFlush(request).addListener(sent -> {
             final var cause = sent.cause();
             if (cause == null) {
-                queue.add(future);
+                queue.add(callback);
             } else {
-                future.setException(cause);
+                callback.onFailure(cause);
             }
         });
-        return future;
     }
 
     @Override
     protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpResponse response) {
-        final var future = queue.poll();
-        if (future == null) {
-            LOG.warn("Unexpected response while no future associated -- Dropping response object {}", response);
-            return;
-        }
-
-        if (!future.isDone()) {
-            // NB using response' copy to disconnect the content data from channel's buffer allocated.
-            // this prevents the content data became inaccessible once byte buffer of original message is released
-            // on exit of current method
-            future.set(response.copy());
+        final var callback = queue.poll();
+        if (callback != null) {
+            callback.onSuccess(response);
         } else {
-            LOG.warn("Future is already in Done state -- Dropping response object {}", response);
+            LOG.warn("Unexpected response while no future associated -- Dropping response object {}", response);
         }
     }
 }