X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=transport%2Ftransport-http%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftransport%2Fhttp%2FClientHttp1RequestDispatcher.java;fp=transport%2Ftransport-http%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftransport%2Fhttp%2FClientHttp1RequestDispatcher.java;h=e6a59f9d4f9ef5c542a93b796c6ed9ebd8f58049;hb=e55255954ebfc7ff510513bb2bff31e156a33042;hp=0000000000000000000000000000000000000000;hpb=2c347423e1684f28b9dea56b6517055294174b3a;p=netconf.git diff --git a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp1RequestDispatcher.java b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp1RequestDispatcher.java new file mode 100644 index 0000000000..e6a59f9d4f --- /dev/null +++ b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp1RequestDispatcher.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.transport.http; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client side {@link RequestDispatcher} implementation for HTTP 1.1. + * + *

+ * Serves as gateway to Netty {@link Channel}, performs sending requests to server, returns server responses associated. + * Uses request to response mapping via queue -- first accepted response is associated with first request sent. + */ +class ClientHttp1RequestDispatcher extends SimpleChannelInboundHandler implements RequestDispatcher { + private static final Logger LOG = LoggerFactory.getLogger(ClientHttp1RequestDispatcher.class); + + private final Queue> queue = new ConcurrentLinkedQueue<>(); + private Channel channel = null; + + ClientHttp1RequestDispatcher() { + super(true); // auto-release + } + + @Override + public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { + channel = ctx.channel(); + super.handlerAdded(ctx); + } + + @Override + public ListenableFuture dispatch(final FullHttpRequest request) { + if (channel == null) { + throw new IllegalStateException("Connection is not established yet"); + } + final var future = SettableFuture.create(); + channel.writeAndFlush(request).addListener(sent -> { + final var cause = sent.cause(); + if (cause == null) { + queue.add(future); + } else { + future.setException(cause); + } + }); + return future; + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpResponse response) { + final var future = queue.poll(); + if (future == null) { + LOG.warn("Unexpected response while no future associated -- Dropping response object {}", response); + return; + } + + if (!future.isDone()) { + // NB using response' copy to disconnect the content data from channel's buffer allocated. + // this prevents the content data became inaccessible once byte buffer of original message is released + // on exit of current method + future.set(response.copy()); + } else { + LOG.warn("Future is already in Done state -- Dropping response object {}", response); + } + } +}