Create Listener to Sender interface 52/114952/15
authorSamuel Schneider <samuel.schneider@pantheon.tech>
Mon, 27 Jan 2025 08:14:09 +0000 (09:14 +0100)
committerIvan Hrasko <ivan.hrasko@pantheon.tech>
Mon, 10 Feb 2025 08:39:20 +0000 (08:39 +0000)
Fully switch to refactored restconf-server SSE support.
Remove old unused logic.

For this create SSEResponse to manage SSE connection.

JIRA: NETCONF-714
Change-Id: I287bee2b2863837ea94e54d1b14e9ac9f813401b
Signed-off-by: Samuel Schneider <samuel.schneider@pantheon.tech>
19 files changed:
apps/restconf-it/src/test/java/org/opendaylight/restconf/server/netty/AbstractE2ETest.java
protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/APIResource.java
protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/ChannelSender.java
protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/RestconfStreamService.java [deleted file]
protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/RestconfTransportChannelListener.java
protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/StreamsResource.java
protocol/restconf-server/src/test/java/org/opendaylight/restconf/server/AbstractRequestProcessorTest.java
protocol/restconf-server/src/test/java/org/opendaylight/restconf/server/RestconfSessionTest.java
protocol/restconf-server/src/test/java/org/opendaylight/restconf/server/RestconfStreamServiceTest.java [deleted file]
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp1SseService.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ClientHttp2SseService.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/EventStreamResponse.java [new file with mode: 0644]
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/EventStreamService.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/HTTPServerSession.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/Response.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ServerRequestExecutor.java
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ServerSseHandler.java [deleted file]
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/SseUtils.java
transport/transport-http/src/test/java/org/opendaylight/netconf/transport/http/SseClientServerTest.java [deleted file]

index c4249cbbabac7c070027facf9b1caa64f443ce23..43cd32d636b3266eefdf0379fe2227c7ec5210e1 100644 (file)
@@ -431,7 +431,7 @@ abstract class AbstractE2ETest extends AbstractDataBrokerTest {
 
     protected TestEventStreamListener startStream(final String uri) {
         final var eventListener = new TestEventStreamListener();
-        clientStreamService.startEventStream(uri, eventListener,
+        clientStreamService.startEventStream("localhost", uri, eventListener,
             new EventStreamService.StartCallback() {
                 @Override
                 public void onStreamStarted(final EventStreamService.StreamControl control) {
index 547ff941ec7938db837a7219fe7e4b90e2a7b003..ad8cda692fa4a3c3ab923b73c53110de9aacfd0c 100644 (file)
@@ -41,8 +41,9 @@ final class APIResource extends AbstractResource {
 
     @NonNullByDefault
     APIResource(final RestconfServer server, final List<String> otherSegments, final String restconfPath,
-            final ErrorTagMapping errorTagMapping, final MessageEncoding defaultEncoding,
-            final PrettyPrintParam defaultPrettyPrint, final RestconfStream.Registry streamRegistry) {
+                final ErrorTagMapping errorTagMapping, final MessageEncoding defaultEncoding,
+                final PrettyPrintParam defaultPrettyPrint, final int sseHeartbeatIntervalMillis,
+                final int sseMaximumFragmentLength, final RestconfStream.Registry streamRegistry) {
         super(new EndpointInvariants(server, defaultPrettyPrint, errorTagMapping, defaultEncoding,
             URI.create(requireNonNull(restconfPath))));
         this.otherSegments = requireNonNull(otherSegments);
@@ -52,7 +53,8 @@ final class APIResource extends AbstractResource {
             "operations", new OperationsResource(invariants),
             "yang-library-version", new YLVResource(invariants),
             "modules", new ModulesResource(invariants),
-            "streams", new StreamsResource(invariants, streamRegistry));
+            "streams", new StreamsResource(invariants, streamRegistry, sseHeartbeatIntervalMillis,
+                sseMaximumFragmentLength));
     }
 
     @Override
index 1af19930a4db1895d60e6d1285feec8a89ec07b1..5b264cd0dc3b91cc590f4e255c062dc004107437 100644 (file)
@@ -8,10 +8,13 @@
 package org.opendaylight.restconf.server;
 
 import static java.util.Objects.requireNonNull;
+import static org.opendaylight.netconf.transport.http.SseUtils.chunksOf;
 
-import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultHttpContent;
 import io.netty.handler.codec.http.FullHttpRequest;
 import org.opendaylight.restconf.server.spi.RestconfStream.Sender;
 import org.opendaylight.yangtools.concepts.Registration;
@@ -30,11 +33,17 @@ import org.opendaylight.yangtools.concepts.Registration;
  * are fully compliant with the HTTP specification.
  */
 final class ChannelSender extends SimpleChannelInboundHandler<FullHttpRequest> implements Sender {
+    private static final ByteBuf EMPTY_LINE = Unpooled.wrappedBuffer(new byte[] { '\r', '\n' }).asReadOnly();
+
+    private final int sseMaximumFragmentLength;
+
     private Registration registration;
     private ChannelHandlerContext context;
 
-    ChannelSender() {
+    ChannelSender(final int sseMaximumFragmentLength) {
         super(FullHttpRequest.class);
+        this.sseMaximumFragmentLength = sseMaximumFragmentLength;
+        context = null;
     }
 
     void enable(final Registration reg) {
@@ -49,7 +58,9 @@ final class ChannelSender extends SimpleChannelInboundHandler<FullHttpRequest> i
     @Override
     public void sendDataMessage(final String data) {
         if (!data.isEmpty() && context != null) {
-            context.writeAndFlush(ByteBufUtil.writeUtf8(context.alloc(), data));
+            chunksOf("data", data, sseMaximumFragmentLength, context.alloc())
+                .forEach(chunk -> context.writeAndFlush(new DefaultHttpContent(chunk)));
+            context.writeAndFlush(new DefaultHttpContent(EMPTY_LINE.retainedSlice()));
         }
     }
 
@@ -72,7 +83,7 @@ final class ChannelSender extends SimpleChannelInboundHandler<FullHttpRequest> i
     }
 
     @Override
-    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
         context = ctx;
         super.handlerAdded(ctx);
     }
diff --git a/protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/RestconfStreamService.java b/protocol/restconf-server/src/main/java/org/opendaylight/restconf/server/RestconfStreamService.java
deleted file mode 100644 (file)
index 2bc03ac..0000000
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.server;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import io.netty.handler.codec.http.QueryStringDecoder;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import javax.xml.xpath.XPathExpressionException;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.netconf.transport.http.ErrorResponseException;
-import org.opendaylight.netconf.transport.http.EventStreamListener;
-import org.opendaylight.netconf.transport.http.EventStreamService;
-import org.opendaylight.restconf.api.QueryParameters;
-import org.opendaylight.restconf.api.query.PrettyPrintParam;
-import org.opendaylight.restconf.server.api.EventStreamGetParams;
-import org.opendaylight.restconf.server.api.ServerError;
-import org.opendaylight.restconf.server.api.YangErrorsBody;
-import org.opendaylight.restconf.server.spi.ErrorTagMapping;
-import org.opendaylight.restconf.server.spi.RestconfStream;
-import org.opendaylight.yangtools.yang.common.ErrorTag;
-import org.opendaylight.yangtools.yang.common.ErrorType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class RestconfStreamService implements EventStreamService {
-    private static final Logger LOG = LoggerFactory.getLogger(RestconfStreamService.class);
-
-    @VisibleForTesting
-    static final String INVALID_STREAM_URI_ERROR = "Invalid stream URI";
-    @VisibleForTesting
-    static final String MISSING_PARAMS_ERROR = "Both stream encoding and stream name are required.";
-    @VisibleForTesting
-    static final String UNKNOWN_STREAM_ERROR = "Requested stream does not exist";
-
-    private static final int ERROR_BUF_SIZE = 2048;
-
-    private final RestconfStream.Registry streamRegistry;
-    private final String basePath;
-    private final ErrorTagMapping errorTagMapping;
-    private final RestconfStream.EncodingName defaultEncoding;
-    private final PrettyPrintParam defaultPrettyPrint;
-
-    public RestconfStreamService(final RestconfStream.Registry registry, final String restconf,
-            final ErrorTagMapping errorTagMapping, final MessageEncoding defaultEncoding,
-            final PrettyPrintParam defaultPrettyPrint) {
-        streamRegistry = requireNonNull(registry);
-        basePath = requireNonNull(restconf);
-        this.defaultEncoding = defaultEncoding.streamEncodingName();
-        this.errorTagMapping = errorTagMapping;
-        this.defaultPrettyPrint = defaultPrettyPrint;
-    }
-
-    @Override
-    public void startEventStream(final @NonNull String requestUri, final @NonNull EventStreamListener listener,
-            final @NonNull StartCallback callback) {
-        // parse URI.
-        // pattern /basePath/streams/streamEncoding/streamName
-        final var decoder = new QueryStringDecoder(requestUri);
-        final var pathParams = PathParameters.from(decoder.path(), basePath);
-        if (!PathParameters.STREAMS.equals(pathParams.apiResource())) {
-            callback.onStartFailure(errorResponse(ErrorTag.DATA_MISSING, INVALID_STREAM_URI_ERROR, defaultEncoding));
-            return;
-        }
-        final var args = pathParams.childIdentifier().split("/", 2);
-        final var streamEncoding = encoding(args[0]);
-        final var streamName = args.length > 1 ? args[1] : null;
-        if (streamEncoding == null || streamName == null || streamName.isEmpty()) {
-            callback.onStartFailure(errorResponse(ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR,
-                streamEncoding == null ? defaultEncoding : streamEncoding));
-            return;
-        }
-
-        // find stream by name
-        final var stream = streamRegistry.lookupStream(streamName);
-        if (stream == null) {
-            callback.onStartFailure(errorResponse(ErrorTag.DATA_MISSING, UNKNOWN_STREAM_ERROR, streamEncoding));
-            return;
-        }
-
-        // Try starting stream via registry stream subscriber
-        final var sender = new RestconfStream.Sender() {
-            @Override
-            public void sendDataMessage(final String data) {
-                listener.onEventField("data", data);
-            }
-
-            @Override
-            public void endOfStream() {
-                listener.onStreamEnd();
-            }
-        };
-        final var streamParams = EventStreamGetParams.of(QueryParameters.ofMultiValue(decoder.parameters()));
-        try {
-            final var registration = stream.addSubscriber(sender, streamEncoding, streamParams);
-            if (registration != null) {
-                callback.onStreamStarted(registration::close);
-            } else {
-                callback.onStartFailure(errorResponse(ErrorTag.DATA_MISSING, UNKNOWN_STREAM_ERROR, streamEncoding));
-            }
-        } catch (UnsupportedEncodingException | XPathExpressionException | IllegalArgumentException e) {
-            callback.onStartFailure(errorResponse(ErrorTag.BAD_ATTRIBUTE, e.getMessage(), streamEncoding));
-        }
-    }
-
-    private static RestconfStream.EncodingName encoding(final String encodingName) {
-        try {
-            return new RestconfStream.EncodingName(encodingName);
-        } catch (IllegalArgumentException e) {
-            LOG.warn("Stream encoding name '{}' is invalid: {}. Ignored.", encodingName, e.getMessage());
-            return null;
-        }
-    }
-
-    private Exception errorResponse(final ErrorTag errorTag, final String errorMessage,
-            final RestconfStream.EncodingName encoding) {
-        final var yangErrorsBody =
-            new YangErrorsBody(List.of(new ServerError(ErrorType.PROTOCOL, errorTag, errorMessage)));
-        final var statusCode = errorTagMapping.statusOf(errorTag).code();
-        try (var out = new ByteArrayOutputStream(ERROR_BUF_SIZE)) {
-            if (RestconfStream.EncodingName.RFC8040_JSON.equals(encoding)) {
-                yangErrorsBody.formatToJSON(defaultPrettyPrint, out);
-                return new ErrorResponseException(statusCode, out.toString(StandardCharsets.UTF_8),
-                    NettyMediaTypes.APPLICATION_YANG_DATA_JSON);
-            } else {
-                yangErrorsBody.formatToXML(defaultPrettyPrint, out);
-                return new ErrorResponseException(statusCode, out.toString(StandardCharsets.UTF_8),
-                    NettyMediaTypes.APPLICATION_YANG_DATA_XML);
-            }
-        } catch (IOException e) {
-            LOG.error("Failure encoding error message", e);
-            // return as plain text
-            return new IllegalStateException(errorMessage);
-        }
-    }
-}
index 982157012fa6ba2d918dd7a22c2effee3d846190..9f3c81fd37e717710f601126381e68f537211907 100644 (file)
@@ -16,7 +16,6 @@ import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.opendaylight.netconf.transport.api.TransportChannelListener;
 import org.opendaylight.netconf.transport.http.HTTPTransportChannel;
-import org.opendaylight.netconf.transport.http.ServerSseHandler;
 import org.opendaylight.restconf.server.api.RestconfServer;
 import org.opendaylight.restconf.server.spi.RestconfStream;
 import org.slf4j.Logger;
@@ -53,7 +52,9 @@ final class RestconfTransportChannelListener implements TransportChannelListener
 
         root = new EndpointRoot(principalService, new WellKnownResources(restconf), firstSegment,
             new APIResource(server, otherSegments, sb.append('/').toString(), configuration.errorTagMapping(),
-                configuration.defaultEncoding(), configuration.prettyPrint(), streamRegistry), streamRegistry);
+                configuration.defaultEncoding(), configuration.prettyPrint(),
+                configuration.sseHeartbeatIntervalMillis().intValue(),
+                configuration.sseMaximumFragmentLength().intValue(), streamRegistry), streamRegistry);
 
         LOG.info("Initialized with service {}", server.getClass());
         LOG.info("Initialized with base path: {}, default encoding: {}, default pretty print: {}", restconf,
@@ -69,12 +70,7 @@ final class RestconfTransportChannelListener implements TransportChannelListener
     public void onTransportChannelEstablished(final HTTPTransportChannel channel) {
         final var session = new RestconfSession(channel.scheme(), root);
         final var nettyChannel = channel.channel();
-        nettyChannel.pipeline().addLast(
-            new ServerSseHandler(
-                new RestconfStreamService(streamRegistry, restconf, configuration.errorTagMapping(),
-                    configuration.defaultEncoding(), configuration.prettyPrint()),
-                configuration.sseMaximumFragmentLength().toJava(), configuration.sseHeartbeatIntervalMillis().toJava()),
-            session);
+        nettyChannel.pipeline().addLast(session);
     }
 
     @Override
index 2e77afa1fe061fc285f2d175d11e3b2c24f32755..1802af3b195cf83f347e99ca849161ad6a18a5b7 100644 (file)
@@ -26,6 +26,7 @@ import javax.xml.xpath.XPathExpressionException;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.netconf.transport.http.EmptyResponse;
+import org.opendaylight.netconf.transport.http.EventStreamResponse;
 import org.opendaylight.netconf.transport.http.HeadersResponse;
 import org.opendaylight.netconf.transport.http.ImplementedMethod;
 import org.opendaylight.netconf.transport.http.PreparedRequest;
@@ -50,11 +51,16 @@ final class StreamsResource extends AbstractLeafResource {
     private static final AsciiString STREAM_ID = HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text();
 
     private final RestconfStream.Registry streamRegistry;
+    private final int sseHeartbeatIntervalMillis;
+    private final int sseMaximumFragmentLength;
     private final Map<Integer, Registration> senders = new HashMap<>();
 
-    StreamsResource(final EndpointInvariants invariants, final RestconfStream.Registry streamRegistry) {
+    StreamsResource(final EndpointInvariants invariants, final RestconfStream.Registry streamRegistry,
+            final int sseHeartbeatIntervalMillis, final int sseMaximumFragmentLength) {
         super(invariants);
         this.streamRegistry = requireNonNull(streamRegistry);
+        this.sseHeartbeatIntervalMillis = sseHeartbeatIntervalMillis;
+        this.sseMaximumFragmentLength = sseMaximumFragmentLength;
     }
 
     @Override
@@ -98,7 +104,7 @@ final class StreamsResource extends AbstractLeafResource {
             return EmptyResponse.NOT_FOUND;
         }
 
-        if (headers.contains(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_EVENT_STREAM, false)) {
+        if (!headers.contains(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_EVENT_STREAM, false)) {
             return new EmptyResponse(HttpResponseStatus.NOT_ACCEPTABLE);
         }
 
@@ -122,23 +128,20 @@ final class StreamsResource extends AbstractLeafResource {
     // not be servicing any other requests.
     private PreparedRequest switchToEventStream(final RestconfStream<?> stream,
             final RestconfStream.EncodingName encoding, final EventStreamGetParams params) {
-        final var sender = new ChannelSender();
+        final var sender = new ChannelSender(sseMaximumFragmentLength);
         final Registration registration = registerSender(stream, encoding, params, sender);
 
         if (registration == null) {
             return EmptyResponse.NOT_FOUND;
         }
-        final var ctx = sender.getCtx();
 
-        // Replace ourselves with the sender and enable it wil the registration
-        ctx.channel().pipeline().replace(ctx.handler(), null, sender);
         sender.enable(registration);
-        return EmptyResponse.OK;
+        return new EventStreamResponse(HttpResponseStatus.OK, sender, sseHeartbeatIntervalMillis);
     }
 
     // HTTP/2 event stream start.
     private PreparedRequest addEventStream(final Integer streamId, final RestconfStream<?> stream,
-        final RestconfStream.EncodingName encoding, final EventStreamGetParams params) {
+            final RestconfStream.EncodingName encoding, final EventStreamGetParams params) {
         final var sender = new StreamSender(streamId);
         final var registration = registerSender(stream, encoding, params, sender);
         if (registration == null) {
index ddc155bd521a0988cf740a4261bfb681b39b15d0..1754a234c30c248b5f4f1bfd15800ed64466cc7e 100644 (file)
@@ -87,7 +87,7 @@ class AbstractRequestProcessorTest {
         session = new RestconfSession(HTTPScheme.HTTP,
             new EndpointRoot(principalService, WELL_KNOWN, BASE_PATH.substring(1),
                 new APIResource(server, List.of(), "/rests/", ERROR_TAG_MAPPING, MessageEncoding.JSON, PRETTY_PRINT,
-                    streamRegistry), streamRegistry));
+                    1000, Integer.MAX_VALUE, streamRegistry), streamRegistry));
         doReturn(channel).when(ctx).channel();
         doReturn(new InetSocketAddress(0)).when(channel).remoteAddress();
         session.handlerAdded(ctx);
index 08a1d1feeb59ae381ec6542793adaa675556446e..50b4dbfe326d86da85bd7d719f0fc56c203251d5 100644 (file)
@@ -66,7 +66,7 @@ class RestconfSessionTest {
         doReturn(channel).when(ctx).channel();
         doReturn(new InetSocketAddress(0)).when(channel).remoteAddress();
         doReturn(pipeline).when(channel).pipeline();
-        doReturn(pipeline).when(pipeline).addLast(any(ChannelHandler.class), any());
+        doReturn(pipeline).when(pipeline).addLast(any(ChannelHandler.class));
         doReturn(HTTPScheme.HTTP).when(transportChannel).scheme();
         // default config just for testing purposes
         final var configuration = new NettyEndpointConfiguration(ErrorTagMapping.RFC8040, PrettyPrintParam.TRUE,
@@ -77,7 +77,7 @@ class RestconfSessionTest {
             configuration);
         listener.onTransportChannelEstablished(transportChannel);
         // capture created session
-        verify(pipeline).addLast(any(ChannelHandler.class), sessionCaptor.capture());
+        verify(pipeline).addLast(sessionCaptor.capture());
         final var session = sessionCaptor.getValue();
         session.handlerAdded(ctx);
 
diff --git a/protocol/restconf-server/src/test/java/org/opendaylight/restconf/server/RestconfStreamServiceTest.java b/protocol/restconf-server/src/test/java/org/opendaylight/restconf/server/RestconfStreamServiceTest.java
deleted file mode 100644 (file)
index cc4062d..0000000
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.server;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.restconf.server.PathParameters.STREAMS;
-import static org.opendaylight.restconf.server.RestconfStreamService.INVALID_STREAM_URI_ERROR;
-import static org.opendaylight.restconf.server.RestconfStreamService.MISSING_PARAMS_ERROR;
-import static org.opendaylight.restconf.server.RestconfStreamService.UNKNOWN_STREAM_ERROR;
-import static org.opendaylight.restconf.server.TestUtils.ERROR_TAG_MAPPING;
-import static org.opendaylight.restconf.server.TestUtils.assertErrorContent;
-
-import io.netty.handler.codec.http.QueryStringEncoder;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Stream;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.opendaylight.netconf.transport.http.ErrorResponseException;
-import org.opendaylight.netconf.transport.http.EventStreamListener;
-import org.opendaylight.netconf.transport.http.EventStreamService;
-import org.opendaylight.restconf.api.QueryParameters;
-import org.opendaylight.restconf.api.query.PrettyPrintParam;
-import org.opendaylight.restconf.api.query.SkipNotificationDataParam;
-import org.opendaylight.restconf.api.query.StartTimeParam;
-import org.opendaylight.restconf.api.query.StopTimeParam;
-import org.opendaylight.restconf.server.TestUtils.TestEncoding;
-import org.opendaylight.restconf.server.api.EventStreamGetParams;
-import org.opendaylight.restconf.server.spi.RestconfStream;
-import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.common.ErrorTag;
-
-@ExtendWith(MockitoExtension.class)
-class RestconfStreamServiceTest {
-
-    private static final String BASE_PATH = "/rests";
-    private static final String URI_PREFIX = BASE_PATH + STREAMS;
-    private static final String URI_TEMPLATE = URI_PREFIX + "/%s/%s";
-    private static final String XML = "xml";
-    private static final String JSON = "json";
-    private static final String VALID_STREAM_NAME = "valid-stream-name";
-    private static final String INVALID_STREAM_NAME = "invalid-stream-name";
-    private static final String ERROR_MESSAGE = "error-message";
-    private static final EventStreamGetParams EMPTY_PARAMS = EventStreamGetParams.of(QueryParameters.of());
-
-    @Mock
-    private RestconfStream.Registry registry;
-    @Mock
-    private RestconfStream<?> stream;
-    @Mock
-    private EventStreamListener listener;
-    @Mock
-    private EventStreamService.StartCallback callback;
-    @Mock
-    private Registration registration;
-
-    @Captor
-    private ArgumentCaptor<Exception> exceptionCaptor;
-    @Captor
-    private ArgumentCaptor<EventStreamService.StreamControl> controlCaptor;
-    @Captor
-    private ArgumentCaptor<EventStreamGetParams> getParamsCaptor;
-    @Captor
-    private ArgumentCaptor<RestconfStream.Sender> senderCaptor;
-
-    private RestconfStreamService streamService;
-
-    @BeforeEach
-    void beforeEach() {
-        streamService = new RestconfStreamService(registry, BASE_PATH, ERROR_TAG_MAPPING, MessageEncoding.JSON,
-            PrettyPrintParam.FALSE);
-    }
-
-    @ParameterizedTest
-    @MethodSource
-    void uriParseFailure(final String uri, final String expectedFormat, final ErrorTag expectedErrorTag,
-            final String expectedMessage) {
-        streamService.startEventStream(uri, listener, callback);
-        verify(callback).onStartFailure(exceptionCaptor.capture());
-        assertErrorResponseException(exceptionCaptor.getValue(), expectedFormat, expectedErrorTag, expectedMessage);
-    }
-
-    private static Stream<Arguments> uriParseFailure() {
-        return Stream.of(
-            // uri, expectedFormat, expectedErrorTag, expectedMessage
-            Arguments.of("/", JSON, ErrorTag.DATA_MISSING, INVALID_STREAM_URI_ERROR),
-            Arguments.of("/smth", JSON, ErrorTag.DATA_MISSING, INVALID_STREAM_URI_ERROR),
-            Arguments.of("/rests", JSON, ErrorTag.DATA_MISSING, INVALID_STREAM_URI_ERROR),
-            Arguments.of(URI_PREFIX, JSON, ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR),
-            Arguments.of(URI_PREFIX + "/..", JSON, ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR),
-            Arguments.of(URI_PREFIX + "/" + JSON, JSON, ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR),
-            Arguments.of(URI_PREFIX + "/" + XML, XML, ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR),
-            Arguments.of(URI_PREFIX + "/" + XML + "/", XML, ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR)
-        );
-    }
-
-    @ParameterizedTest
-    @ValueSource(strings = {XML, JSON})
-    void streamNotFoundFailure(final String encoding) {
-        doReturn(null).when(registry).lookupStream(INVALID_STREAM_NAME);
-        final var uri = URI_TEMPLATE.formatted(encoding, INVALID_STREAM_NAME);
-        streamService.startEventStream(uri, listener, callback);
-        verify(callback).onStartFailure(exceptionCaptor.capture());
-        assertErrorResponseException(exceptionCaptor.getValue(), encoding, ErrorTag.DATA_MISSING, UNKNOWN_STREAM_ERROR);
-    }
-
-    @ParameterizedTest
-    @ValueSource(strings = {XML, JSON})
-    void streamSubscribeFailureWithException(final String encoding) throws Exception {
-        final var encodingName = new RestconfStream.EncodingName(encoding);
-        doReturn(stream).when(registry).lookupStream(VALID_STREAM_NAME);
-        doThrow(new IllegalArgumentException(ERROR_MESSAGE))
-            .when(stream).addSubscriber(any(RestconfStream.Sender.class), eq(encodingName), eq(EMPTY_PARAMS));
-
-        final var uri = URI_TEMPLATE.formatted(encoding, VALID_STREAM_NAME);
-        streamService.startEventStream(uri, listener, callback);
-        verify(callback).onStartFailure(exceptionCaptor.capture());
-        assertErrorResponseException(exceptionCaptor.getValue(), encoding, ErrorTag.BAD_ATTRIBUTE, ERROR_MESSAGE);
-    }
-
-    @ParameterizedTest
-    @ValueSource(strings = {XML, JSON})
-    void streamSubscribeFailureWithoutException(final String encoding) throws Exception {
-        final var encodingName = new RestconfStream.EncodingName(encoding);
-        doReturn(stream).when(registry).lookupStream(VALID_STREAM_NAME);
-        doReturn(null).when(stream).addSubscriber(any(RestconfStream.Sender.class),
-            eq(encodingName), eq(EMPTY_PARAMS));
-
-        final var uri = URI_TEMPLATE.formatted(encoding, VALID_STREAM_NAME);
-        streamService.startEventStream(uri, listener, callback);
-        verify(callback).onStartFailure(exceptionCaptor.capture());
-        assertErrorResponseException(exceptionCaptor.getValue(), encoding, ErrorTag.DATA_MISSING, UNKNOWN_STREAM_ERROR);
-    }
-
-    private static void assertErrorResponseException(final Exception caught, final String expectedFormat,
-            final ErrorTag expectedErrorTag, final String expectedMessage) {
-        final var exception = assertInstanceOf(ErrorResponseException.class, caught);
-        final var expectedStatusCode = ERROR_TAG_MAPPING.statusOf(expectedErrorTag).code();
-        assertEquals(expectedStatusCode, exception.statusCode());
-        final var encoding = JSON.equals(expectedFormat) ? TestEncoding.JSON : TestEncoding.XML;
-        assertEquals(encoding.responseType.toString(), exception.contentType());
-        assertErrorContent(exception.getMessage(), encoding, expectedErrorTag, expectedMessage);
-    }
-
-    @ParameterizedTest
-    @MethodSource
-    void subscribeSuccess(final String encoding, final Map<String, List<String>> queryParams) throws Exception {
-        final var encodingName = new RestconfStream.EncodingName(encoding);
-        doReturn(stream).when(registry).lookupStream(VALID_STREAM_NAME);
-        doReturn(registration).when(stream).addSubscriber(any(), eq(encodingName), any(EventStreamGetParams.class));
-
-        // build uri with query parameters
-        final var uriEncoder = new QueryStringEncoder(URI_TEMPLATE.formatted(encoding, VALID_STREAM_NAME),
-            Charset.defaultCharset());
-        queryParams.forEach((name, list) -> {
-            if (!list.isEmpty()) {
-                uriEncoder.addParam(name, list.get(0));
-            }
-        });
-        streamService.startEventStream(uriEncoder.toString(), listener, callback);
-        verify(stream).addSubscriber(senderCaptor.capture(), eq(encodingName), getParamsCaptor.capture());
-
-        // verify params passed
-        final var params = getParamsCaptor.getValue();
-        assertNotNull(params);
-        if (!queryParams.isEmpty()) {
-            final var expected = EventStreamGetParams.of(QueryParameters.ofMultiValue(queryParams));
-            assertNotNull(params.startTime());
-            assertNotNull(params.stopTime());
-            assertNotNull(params.skipNotificationData());
-            assertEquals(expected.startTime().paramValue(), params.startTime().paramValue());
-            assertEquals(expected.stopTime().paramValue(), params.stopTime().paramValue());
-            assertEquals(expected.skipNotificationData().paramValue(), params.skipNotificationData().paramValue());
-        }
-
-        // verify sender object passed is linked to listener
-        final var sender = senderCaptor.getValue();
-        assertNotNull(sender);
-        sender.sendDataMessage("test");
-        verify(listener, times(1)).onEventField("data", "test");
-        sender.endOfStream();
-        verify(listener, times(1)).onStreamEnd();
-
-        // verify returned control object terminates subscription registration
-        verify(callback).onStreamStarted(controlCaptor.capture());
-        final var control = controlCaptor.getValue();
-        assertNotNull(control);
-        control.close();
-        verify(registration, times(1)).close();
-    }
-
-    private static Stream<Arguments> subscribeSuccess() {
-        return Stream.of(
-            Arguments.of(JSON, Map.of()),
-            Arguments.of(XML, Map.of(
-                StartTimeParam.uriName, List.of("2024-08-05T09:00:00Z"),
-                StopTimeParam.uriName, List.of("2024-08-05T18:00:00Z"),
-                SkipNotificationDataParam.uriName, List.of("true")))
-        );
-    }
-}
index 38a892e97b10535f9bba859c6abb90675942697b..d4938180c3f2ad8d5e80bd70c39c1663aa6fe641 100644 (file)
@@ -59,7 +59,7 @@ final class ClientHttp1SseService implements EventStreamService {
     }
 
     @Override
-    public void startEventStream(final String requestUri, final EventStreamListener listener,
+    public void startEventStream(final String host, final String requestUri, final EventStreamListener listener,
             final StartCallback callback) {
         if (!channel.isActive()) {
             callback.onStartFailure(new IllegalStateException("Connection is closed"));
@@ -92,7 +92,8 @@ final class ClientHttp1SseService implements EventStreamService {
         request.headers()
             .set(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_EVENT_STREAM)
             .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
-            .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
+            .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
+            .set(HttpHeaderNames.HOST, host);
         channel.writeAndFlush(request);
         LOG.debug("SSE request sent to {}", requestUri);
     }
index 6ba94b68a46eaf34b566fa7cebecf83d6e9d55a9..4936bcb1eb2148ae63f2d5930f6d90130b41e605 100644 (file)
@@ -82,7 +82,7 @@ final class ClientHttp2SseService extends ChannelInboundHandlerAdapter
     }
 
     @Override
-    public void startEventStream(final String requestUri, final EventStreamListener listener,
+    public void startEventStream(final String host, final String requestUri, final EventStreamListener listener,
             final StartCallback callback) {
         if (!channel.isActive()) {
             callback.onStartFailure(new IllegalStateException("Connection is closed"));
diff --git a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/EventStreamResponse.java b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/EventStreamResponse.java
new file mode 100644 (file)
index 0000000..c18940c
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2025 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.http;
+
+import static io.netty.handler.codec.http2.HttpConversionUtil.ExtensionHeaderNames.STREAM_ID;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMessage;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import java.util.concurrent.TimeUnit;
+
+public final class EventStreamResponse implements Response {
+    private static final ByteBuf PING_MESSAGE =
+        Unpooled.wrappedBuffer(new byte[] { ':', 'p', 'i', 'n', 'g', '\r', '\n', '\r', '\n' }).asReadOnly();
+    private static final String CHANNEL_SENDER_CONTEXT_NAME = "ChannelSenderContext";
+
+    private final ChannelHandler sender;
+    private final HttpResponseStatus status;
+    private final int sseHeartbeatIntervalMillis;
+
+    private ChannelHandlerContext context = null;
+
+    public EventStreamResponse(final HttpResponseStatus status, final ChannelHandler sender,
+            int sseHeartbeatIntervalMillis) {
+        this.status = status;
+        this.sender = sender;
+        this.sseHeartbeatIntervalMillis = sseHeartbeatIntervalMillis;
+    }
+
+    @Override
+    public HttpResponseStatus status() {
+        return status;
+    }
+
+    public DefaultHttpResponse start(final ChannelHandlerContext ctx, final Integer streamId,
+            final HttpVersion version) {
+        // Replace handler with the sender and get new context
+        context = ctx.channel().pipeline().replace(ctx.handler(), CHANNEL_SENDER_CONTEXT_NAME, sender)
+            .context(CHANNEL_SENDER_CONTEXT_NAME);
+
+        final var response = new DefaultHttpResponse(version, HttpResponseStatus.OK);
+        response.headers()
+            .set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_EVENT_STREAM)
+            .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
+            .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
+        copyStreamId(streamId, response);
+
+        if (sseHeartbeatIntervalMillis > 0) {
+            schedulePing();
+        }
+        return response;
+    }
+
+    private void schedulePing() {
+        context.executor().schedule(this::sendPing, sseHeartbeatIntervalMillis, TimeUnit.MILLISECONDS);
+    }
+
+    private void sendPing() {
+        if (isChannelWritable()) {
+            context.writeAndFlush(new DefaultHttpContent(PING_MESSAGE.retainedSlice()));
+            schedulePing();
+        }
+    }
+
+    private boolean isChannelWritable() {
+        return context != null && !context.isRemoved() && context.channel().isActive();
+    }
+
+    static void copyStreamId(final Integer streamId, final HttpMessage to) {
+        if (streamId != null) {
+            to.headers().setInt(STREAM_ID.text(), streamId);
+        }
+    }
+}
index 5271f310413b3a733b9425620a6caa65a5489a67..06767dfea9e685988ed03c06af2092a77beca46b 100644 (file)
@@ -26,11 +26,12 @@ public interface EventStreamService {
      * <p>If request is declined then {@link StartCallback#onStartFailure(Exception)} method invoked with
      * {@link Exception} describing the decline reason.
      *
+     * @param host host to be used in headers of stream request
      * @param requestUri stream request URI
      * @param listener SSE event consumer
      * @param callback SSE stream request callback
      */
-    void startEventStream(String requestUri, EventStreamListener listener, StartCallback callback);
+    void startEventStream(String host, String requestUri, EventStreamListener listener, StartCallback callback);
 
     /**
      * Invoked when the request to attach to an event stream finishes.
index beea59a3abefa7eb60fcba160bbbb4481eb734aa..1fc7c47bd124fccfb13e67245caa27d508786cb6 100644 (file)
@@ -20,6 +20,7 @@ import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpUtil;
 import io.netty.handler.codec.http.HttpVersion;
@@ -255,7 +256,7 @@ public abstract class HTTPServerSession extends SimpleChannelInboundHandler<Full
 
     @NonNullByDefault
     static final void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId,
-            final FullHttpResponse response) {
+            final HttpResponse response) {
         requireNonNull(response);
         if (streamId != null) {
             response.headers().setInt(STREAM_ID, streamId);
index 1f4c0fc1dfda5804e39a0cd93b3fdba0b954b3ba..d55ad0741b366d4562ced05cc2210e1a7ad5d55c 100644 (file)
@@ -15,7 +15,7 @@ import org.eclipse.jdt.annotation.NonNullByDefault;
  * {@link #asResponse()}. It has a {@link #status()} and can be turned into (a series of) HTTP codec objects.
  */
 @NonNullByDefault
-public sealed interface Response extends CompletedRequest permits ReadyResponse, FiniteResponse {
+public sealed interface Response extends CompletedRequest permits ReadyResponse, FiniteResponse, EventStreamResponse {
     @Override
     default Response asResponse() {
         return this;
index f7b2e82b5c69fa8d15c0799a19cba3b8e7cc29e4..5aa40910e063312b4f53490233dc2545fbdeb97e 100644 (file)
@@ -119,6 +119,8 @@ final class ServerRequestExecutor implements PendingRequestListener {
         switch (response) {
             case ReadyResponse resp -> HTTPServerSession.respond(ctx, streamId, resp.toHttpResponse(version));
             case FiniteResponse resp -> respond(ctx, streamId, version, resp);
+            case EventStreamResponse resp -> HTTPServerSession.respond(ctx, streamId,
+                resp.start(ctx, streamId, version));
         }
     }
 
diff --git a/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ServerSseHandler.java b/transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ServerSseHandler.java
deleted file mode 100644 (file)
index 7002116..0000000
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.transport.http;
-
-import static io.netty.handler.codec.http2.HttpConversionUtil.ExtensionHeaderNames.STREAM_ID;
-import static java.util.Objects.requireNonNull;
-import static org.opendaylight.netconf.transport.http.SseUtils.chunksOf;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.DefaultHttpContent;
-import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.DefaultLastHttpContent;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpHeaderValues;
-import io.netty.handler.codec.http.HttpMessage;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.LastHttpContent;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.netconf.transport.http.EventStreamService.StartCallback;
-import org.opendaylight.netconf.transport.http.EventStreamService.StreamControl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Server side Server-Sent Event (SSE) Handler.
- *
- * <p>Intercepts GET requests with {@code accept=text/event-stream} header, invokes the
- * {@link EventStreamService#startEventStream(String, EventStreamListener, StartCallback)} using
- * request {@code URI} as parameter. If request is accepted the handler starts an event stream as
- * {@code transfer-encoding=chunked} response: headers are sent immediately, body chunks are sent on
- * service events.
- *
- * <p>If request is not accepted then error response will be returned with error message as response body.
- * If decline exception is an instance of {@link ErrorResponseException} then explicitly defined
- * {@code content-type} value and response status code will be used in error response header.
- *
- * <p>@deprecated This class will be removed in next release and replaced by dedicated {@code AbstractResource}
- * handling RFC 8040 streams endpoint.
- */
-@Deprecated(forRemoval = true)
-public final class ServerSseHandler extends ChannelInboundHandlerAdapter implements EventStreamListener {
-    private static final Logger LOG = LoggerFactory.getLogger(ServerSseHandler.class);
-    private static final ByteBuf PING_MESSAGE =
-        Unpooled.wrappedBuffer(new byte[] { ':', 'p', 'i', 'n', 'g', '\r', '\n', '\r', '\n' }).asReadOnly();
-    private static final ByteBuf EMPTY_LINE = Unpooled.wrappedBuffer(new byte[] { '\r', '\n' }).asReadOnly();
-
-    private final int maxFieldValueLength;
-    private final long heartbeatIntervalMillis;
-    private final EventStreamService service;
-
-    private ChannelHandlerContext context;
-    private StreamControl eventStream;
-    private boolean streaming;
-
-    /**
-     * Default constructor.
-     *
-     * @param service the event stream service instance
-     * @param maxFieldValueLength max length of event message in chars, if parameter value is greater than zero and
-     *        message length exceeds the limit then message will split to sequence of shorter messages;
-     *        if parameter value is zero or less, then message length won't be checked
-     * @param heartbeatIntervalMillis the keep-alive ping message interval in milliseconds, if set to zero or less
-     *        no ping message will be sent by server
-     */
-    public ServerSseHandler(final EventStreamService service, final int maxFieldValueLength,
-            final long heartbeatIntervalMillis) {
-        this.service = requireNonNull(service);
-        this.maxFieldValueLength = maxFieldValueLength;
-        this.heartbeatIntervalMillis = heartbeatIntervalMillis;
-    }
-
-    @Override
-    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
-        context = ctx;
-        ctx.channel().closeFuture().addListener(ignored -> unregister());
-        LOG.debug("Server SSE enabled on channel {}", context.channel());
-    }
-
-    @Override
-    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
-        switch (msg) {
-            case FullHttpRequest request -> channelRead(ctx, request);
-            default -> super.channelRead(ctx, msg);
-        }
-    }
-
-    private void channelRead(final ChannelHandlerContext ctx, final FullHttpRequest request) {
-        if (streaming) {
-            LOG.warn("Ignoring unexpected request while SSE stream is active: {}", request);
-            return;
-        }
-
-        if (HttpMethod.GET.equals(request.method())
-            && request.headers().contains(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_EVENT_STREAM, true)) {
-
-            service.startEventStream(request.retain().uri(), this, new StartCallback() {
-                @Override
-                public void onStreamStarted(final StreamControl streamControl) {
-                    confirmEventStreamRequest(request, streamControl);
-                    request.release();
-                }
-
-                @Override
-                public void onStartFailure(final Exception exception) {
-                    declineEventStreamRequest(request, exception);
-                    request.release();
-                }
-            });
-            return;
-        }
-
-        // pass request to next handler
-        ctx.fireChannelRead(request);
-    }
-
-    private void confirmEventStreamRequest(final FullHttpRequest request, final StreamControl startedStream) {
-        eventStream = startedStream;
-        streaming = true;
-        // response OK with headers only, body chunks will be an event stream
-        final var response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
-        response.headers()
-            .set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_EVENT_STREAM)
-            .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
-            .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
-        copyStreamId(request, response);
-        context.writeAndFlush(response);
-        // schedule keep-alive events (no action required SSE 'ping' comment) if necessary
-        if (heartbeatIntervalMillis > 0) {
-            schedulePing();
-        }
-        LOG.debug("Event Stream request accepted for URI={}", request.uri());
-    }
-
-    private void declineEventStreamRequest(final FullHttpRequest request, final Throwable exception) {
-        final var errorMessage = exception.getMessage();
-        LOG.debug("Event Stream request declined for URI={} -> {}", request.uri(), errorMessage);
-        final HttpResponseStatus status;
-        final CharSequence contentType;
-        if (exception instanceof ErrorResponseException errorResponse) {
-            status = HttpResponseStatus.valueOf(errorResponse.statusCode());
-            contentType = errorResponse.contentType();
-        } else {
-            status = HttpResponseStatus.BAD_REQUEST;
-            contentType = HttpHeaderValues.TEXT_PLAIN;
-        }
-        final var response = new DefaultFullHttpResponse(request.protocolVersion(), status,
-            Unpooled.wrappedBuffer(errorMessage.getBytes(StandardCharsets.UTF_8)));
-        response.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType)
-            .setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
-        copyStreamId(request, response);
-        context.writeAndFlush(response);
-    }
-
-    @Override
-    public void onEventField(final String fieldName, final String fieldValue) {
-        if (isChannelWritable()) {
-            chunksOf(fieldName, fieldValue, maxFieldValueLength, context.alloc())
-                .forEach(chunk -> context.writeAndFlush(new DefaultHttpContent(chunk)));
-            context.writeAndFlush(new DefaultHttpContent(EMPTY_LINE.retainedSlice()));
-        }
-    }
-
-    @Override
-    public void onStreamStart() {
-        // noop
-    }
-
-    @Override
-    public void onStreamEnd() {
-        onStreamEnd(new DefaultLastHttpContent());
-    }
-
-    private void onStreamEnd(final LastHttpContent lastContent) {
-        if (isChannelWritable() && streaming) {
-            context.writeAndFlush(lastContent);
-        }
-        unregister();
-        streaming = false;
-    }
-
-    private void schedulePing() {
-        context.executor().schedule(this::sendPing, heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
-    }
-
-    private void sendPing() {
-        if (isChannelWritable() && streaming) {
-            context.writeAndFlush(new DefaultHttpContent(PING_MESSAGE.retainedSlice()));
-            schedulePing();
-        }
-    }
-
-    private boolean isChannelWritable() {
-        return context != null && !context.isRemoved() && context.channel().isActive();
-    }
-
-    private void unregister() {
-        if (eventStream != null) {
-            eventStream.close();
-        }
-    }
-
-    /**
-     * Copies HTTP/2 associated stream id value (if exists) from one HTTP 1.1 message to another.
-     *
-     * @param from the message object to copy value from
-     * @param to the message object to copy value to
-     */
-    static void copyStreamId(final HttpMessage from, final HttpMessage to) {
-        final var streamId = from.headers().getInt(STREAM_ID.text());
-        if (streamId != null) {
-            to.headers().setInt(STREAM_ID.text(), streamId);
-        }
-    }
-
-}
index c4e96357cbf110bf52d62c83593a220790e5aede..39099e499d5113579051564a188c6213176680f5 100644 (file)
@@ -64,7 +64,7 @@ public final class SseUtils {
      * @param allocator the {@link ByteBufAllocator} instance used to allocate space for binary data
      * @return list of {@link ByteBuf}
      */
-    static List<ByteBuf> chunksOf(final String fieldName, final String fieldValue, final int maxValueLength,
+    public static List<ByteBuf> chunksOf(final String fieldName, final String fieldValue, final int maxValueLength,
             final ByteBufAllocator allocator) {
         final var valueStr = CRLF_MATCHER.removeFrom(requireNonNull(fieldValue));
         final var valueLen = valueStr.length();
diff --git a/transport/transport-http/src/test/java/org/opendaylight/netconf/transport/http/SseClientServerTest.java b/transport/transport-http/src/test/java/org/opendaylight/netconf/transport/http/SseClientServerTest.java
deleted file mode 100644 (file)
index 3bab037..0000000
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.transport.http;
-
-import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
-import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
-import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
-import static io.netty.handler.codec.http.HttpMethod.GET;
-import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.netconf.transport.http.ConfigUtils.clientTransportTcp;
-import static org.opendaylight.netconf.transport.http.ConfigUtils.clientTransportTls;
-import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTcp;
-import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTls;
-import static org.opendaylight.netconf.transport.http.TestUtils.freePort;
-import static org.opendaylight.netconf.transport.http.TestUtils.generateX509CertData;
-import static org.opendaylight.netconf.transport.http.TestUtils.invoke;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
-import java.net.InetAddress;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.stream.IntStream;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.opendaylight.netconf.transport.api.TransportChannelListener;
-import org.opendaylight.netconf.transport.http.EventStreamService.StartCallback;
-import org.opendaylight.netconf.transport.tcp.BootstrapFactory;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.http.client.rev240208.HttpClientStackGrouping;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.http.server.rev240208.HttpServerStackGrouping;
-
-@ExtendWith(MockitoExtension.class)
-class SseClientServerTest {
-    private static final String USERNAME = "username";
-    private static final String PASSWORD = "pa$$W0rd";
-    private static final Map<String, String> USER_HASHES_MAP = Map.of(USERNAME, "$0$" + PASSWORD);
-    private static final String DATA_URI = "/data";
-    private static final String STREAM_URI = "/stream";
-    private static final ByteBuf OK_CONTENT = Unpooled.wrappedBuffer("OK".getBytes(StandardCharsets.UTF_8));
-    private static final String DECLINE_MESSAGE = "decline-message";
-    private static final String DATA = "data";
-    private static final List<String> DATA_VALUES = IntStream.rangeClosed(1, 10)
-        .mapToObj(num -> "value " + num).toList();
-
-    private static BootstrapFactory bootstrapFactory;
-    private static String localAddress;
-
-    @Mock
-    private HttpServerStackGrouping serverConfig;
-    @Mock
-    private HttpClientStackGrouping clientConfig;
-    @Mock
-    private EventStreamListener eventStreamListener;
-    @Mock
-    private StartCallback startCallback;
-    @Captor
-    private ArgumentCaptor<Exception> exceptionCaptor;
-
-    private EventStreamService clientEventStreamService;
-    private TestStreamService serverEventStreamService;
-    private TestTransportListener serverTransportListener;
-    private TestTransportListener clientTransportListener;
-
-    @BeforeAll
-    static void beforeAll() {
-        bootstrapFactory = new BootstrapFactory("IntegrationTest", 0);
-        localAddress = InetAddress.getLoopbackAddress().getHostAddress();
-    }
-
-    @AfterAll
-    static void afterAll() {
-        bootstrapFactory.close();
-    }
-
-    @BeforeEach
-    void beforeEach() {
-        clientEventStreamService = null;
-        serverEventStreamService = new TestStreamService();
-        // init SSE layer on top of HTTP layer using Transport channel listeners
-        serverTransportListener = new TestTransportListener(channel -> {
-            channel.channel().pipeline().addLast(
-                new ServerSseHandler(serverEventStreamService, 0, 0),
-                new SimpleChannelInboundHandler<>(FullHttpRequest.class) {
-                    @Override
-                    protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest msg) {
-                        final var response = DATA_URI.equals(msg.uri())
-                            ? new DefaultFullHttpResponse(msg.protocolVersion(), OK, OK_CONTENT.copy())
-                            : new DefaultFullHttpResponse(msg.protocolVersion(), NOT_FOUND, Unpooled.EMPTY_BUFFER);
-                        response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
-                        ServerSseHandler.copyStreamId(msg, response);
-                        ctx.writeAndFlush(response);
-                    }
-                });
-        });
-        clientTransportListener = new TestTransportListener(channel ->
-            clientEventStreamService = SseUtils.enableClientSse(channel));
-    }
-
-    @ParameterizedTest(name = "TCP with no authorization, HTTP/2: {0}")
-    @ValueSource(booleans = {false, true})
-    void noAuthTcp(final boolean http2) throws Exception {
-        final var localPort = freePort();
-        doReturn(serverTransportTcp(localAddress, localPort)).when(serverConfig).getTransport();
-        doReturn(clientTransportTcp(localAddress, localPort)).when(clientConfig).getTransport();
-        integrationTest(http2);
-    }
-
-    @ParameterizedTest(name = "TCP with Basic authorization, HTTP/2: {0}")
-    @ValueSource(booleans = {false, true})
-    void basicAuthTcp(final boolean http2) throws Exception {
-        final var localPort = freePort();
-        doReturn(serverTransportTcp(localAddress, localPort, USER_HASHES_MAP))
-            .when(serverConfig).getTransport();
-        doReturn(clientTransportTcp(localAddress, localPort, USERNAME, PASSWORD))
-            .when(clientConfig).getTransport();
-        integrationTest(http2);
-    }
-
-    @ParameterizedTest(name = "TLS with no authorization, HTTP/2: {0}")
-    @ValueSource(booleans = {false, true})
-    void noAuthTls(final boolean http2) throws Exception {
-        final var certData = generateX509CertData("RSA");
-        final var localPort = freePort();
-        doReturn(serverTransportTls(localAddress, localPort, certData.certificate(), certData.privateKey()))
-            .when(serverConfig).getTransport();
-        doReturn(clientTransportTls(localAddress, localPort, certData.certificate())).when(clientConfig).getTransport();
-        integrationTest(http2);
-    }
-
-    @ParameterizedTest(name = "TLS with Basic authorization, HTTP/2: {0}")
-    @ValueSource(booleans = {false, true})
-    void basicAuthTls(final boolean http2) throws Exception {
-        final var certData = generateX509CertData("RSA");
-        final var localPort = freePort();
-        doReturn(serverTransportTls(localAddress, localPort, certData.certificate(), certData.privateKey(),
-            USER_HASHES_MAP)).when(serverConfig).getTransport();
-        doReturn(clientTransportTls(localAddress, localPort, certData.certificate(), USERNAME, PASSWORD))
-            .when(clientConfig).getTransport();
-        integrationTest(http2);
-    }
-
-    private void integrationTest(final boolean http2) throws Exception {
-        final var server = HTTPServer.listen(serverTransportListener, bootstrapFactory.newServerBootstrap(),
-            serverConfig).get(2, TimeUnit.SECONDS);
-        try {
-            final var client = HTTPClient.connect(clientTransportListener, bootstrapFactory.newBootstrap(),
-                clientConfig, http2).get(2, TimeUnit.SECONDS);
-            try {
-                await().atMost(Duration.ofSeconds(2)).until(() -> serverTransportListener.initialized);
-                await().atMost(Duration.ofSeconds(2)).until(() -> clientTransportListener.initialized);
-                assertNotNull(clientEventStreamService);
-
-                // verify HTTP request/response works over current connection
-                assertGetRequest(client);
-
-                // request SSE with invalid URI
-                clientEventStreamService.startEventStream(DATA_URI, eventStreamListener, startCallback);
-                verify(startCallback, timeout(1000)).onStartFailure(exceptionCaptor.capture());
-                final var exception = exceptionCaptor.getValue();
-                assertNotNull(exception);
-                assertEquals(DECLINE_MESSAGE, exception.getMessage());
-
-                // start SSE stream with proper URI
-                clientEventStreamService.startEventStream(STREAM_URI, eventStreamListener, startCallback);
-                verify(startCallback, timeout(1000)).onStreamStarted(any());
-                verify(eventStreamListener).onStreamStart();
-
-                // send series of event fields (name:value pairs)
-                assertNotNull(serverEventStreamService.listener);
-                for (var value : DATA_VALUES) {
-                    serverEventStreamService.listener.onEventField(DATA, value);
-                    verify(eventStreamListener, timeout(1000)).onEventField(DATA, value);
-                }
-
-                // end stream while keeping connection alive
-                serverEventStreamService.listener.onStreamEnd();
-                verify(eventStreamListener, timeout(1000)).onStreamEnd();
-
-                // verify HTTP request/response works on same connection
-                assertGetRequest(client);
-
-            } finally {
-                client.shutdown().get(2, TimeUnit.SECONDS);
-            }
-        } finally {
-            server.shutdown().get(2, TimeUnit.SECONDS);
-        }
-    }
-
-    private static void assertGetRequest(final HTTPClient client) throws Exception {
-        final var request = new DefaultFullHttpRequest(HTTP_1_1, GET, DATA_URI);
-        request.headers().set(CONNECTION, KEEP_ALIVE);
-        final var response = invoke(client, request).get(2, TimeUnit.SECONDS);
-        assertNotNull(response);
-        assertEquals(OK, response.status());
-    }
-
-    private static final class TestStreamService implements EventStreamService {
-        private EventStreamListener listener;
-
-        @Override
-        public void startEventStream(final String requestUri, final EventStreamListener eventListener,
-                final StartCallback callback) {
-            if (STREAM_URI.equals(requestUri)) {
-                // accept stream request
-                listener = eventListener;
-                callback.onStreamStarted(() -> {
-                    // no-op
-                });
-            } else {
-                // decline stream request
-                callback.onStartFailure(new IllegalStateException(DECLINE_MESSAGE));
-            }
-        }
-    }
-
-    private static class TestTransportListener implements TransportChannelListener<HTTPTransportChannel> {
-        private final Consumer<HTTPTransportChannel> initializer;
-        private volatile boolean initialized;
-
-        TestTransportListener(final Consumer<HTTPTransportChannel> initializer) {
-            this.initializer = initializer;
-        }
-
-        @Override
-        public void onTransportChannelEstablished(final HTTPTransportChannel channel) {
-            initialized = true;
-            initializer.accept(channel);
-        }
-
-        @Override
-        public void onTransportChannelFailed(final Throwable cause) {
-            throw new IllegalStateException("HTTP connection failure", cause);
-        }
-    }
-}