Fully switch to refactored restconf-server SSE support.
Remove old unused logic.
For this create SSEResponse to manage SSE connection.
JIRA: NETCONF-714
Change-Id: I287bee2b2863837ea94e54d1b14e9ac9f813401b
Signed-off-by: Samuel Schneider <samuel.schneider@pantheon.tech>
protected TestEventStreamListener startStream(final String uri) {
final var eventListener = new TestEventStreamListener();
- clientStreamService.startEventStream(uri, eventListener,
+ clientStreamService.startEventStream("localhost", uri, eventListener,
new EventStreamService.StartCallback() {
@Override
public void onStreamStarted(final EventStreamService.StreamControl control) {
@NonNullByDefault
APIResource(final RestconfServer server, final List<String> otherSegments, final String restconfPath,
- final ErrorTagMapping errorTagMapping, final MessageEncoding defaultEncoding,
- final PrettyPrintParam defaultPrettyPrint, final RestconfStream.Registry streamRegistry) {
+ final ErrorTagMapping errorTagMapping, final MessageEncoding defaultEncoding,
+ final PrettyPrintParam defaultPrettyPrint, final int sseHeartbeatIntervalMillis,
+ final int sseMaximumFragmentLength, final RestconfStream.Registry streamRegistry) {
super(new EndpointInvariants(server, defaultPrettyPrint, errorTagMapping, defaultEncoding,
URI.create(requireNonNull(restconfPath))));
this.otherSegments = requireNonNull(otherSegments);
"operations", new OperationsResource(invariants),
"yang-library-version", new YLVResource(invariants),
"modules", new ModulesResource(invariants),
- "streams", new StreamsResource(invariants, streamRegistry));
+ "streams", new StreamsResource(invariants, streamRegistry, sseHeartbeatIntervalMillis,
+ sseMaximumFragmentLength));
}
@Override
package org.opendaylight.restconf.server;
import static java.util.Objects.requireNonNull;
+import static org.opendaylight.netconf.transport.http.SseUtils.chunksOf;
-import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import org.opendaylight.restconf.server.spi.RestconfStream.Sender;
import org.opendaylight.yangtools.concepts.Registration;
* are fully compliant with the HTTP specification.
*/
final class ChannelSender extends SimpleChannelInboundHandler<FullHttpRequest> implements Sender {
+ private static final ByteBuf EMPTY_LINE = Unpooled.wrappedBuffer(new byte[] { '\r', '\n' }).asReadOnly();
+
+ private final int sseMaximumFragmentLength;
+
private Registration registration;
private ChannelHandlerContext context;
- ChannelSender() {
+ ChannelSender(final int sseMaximumFragmentLength) {
super(FullHttpRequest.class);
+ this.sseMaximumFragmentLength = sseMaximumFragmentLength;
+ context = null;
}
void enable(final Registration reg) {
@Override
public void sendDataMessage(final String data) {
if (!data.isEmpty() && context != null) {
- context.writeAndFlush(ByteBufUtil.writeUtf8(context.alloc(), data));
+ chunksOf("data", data, sseMaximumFragmentLength, context.alloc())
+ .forEach(chunk -> context.writeAndFlush(new DefaultHttpContent(chunk)));
+ context.writeAndFlush(new DefaultHttpContent(EMPTY_LINE.retainedSlice()));
}
}
}
@Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
context = ctx;
super.handlerAdded(ctx);
}
+++ /dev/null
-/*
- * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.server;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import io.netty.handler.codec.http.QueryStringDecoder;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import javax.xml.xpath.XPathExpressionException;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.netconf.transport.http.ErrorResponseException;
-import org.opendaylight.netconf.transport.http.EventStreamListener;
-import org.opendaylight.netconf.transport.http.EventStreamService;
-import org.opendaylight.restconf.api.QueryParameters;
-import org.opendaylight.restconf.api.query.PrettyPrintParam;
-import org.opendaylight.restconf.server.api.EventStreamGetParams;
-import org.opendaylight.restconf.server.api.ServerError;
-import org.opendaylight.restconf.server.api.YangErrorsBody;
-import org.opendaylight.restconf.server.spi.ErrorTagMapping;
-import org.opendaylight.restconf.server.spi.RestconfStream;
-import org.opendaylight.yangtools.yang.common.ErrorTag;
-import org.opendaylight.yangtools.yang.common.ErrorType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class RestconfStreamService implements EventStreamService {
- private static final Logger LOG = LoggerFactory.getLogger(RestconfStreamService.class);
-
- @VisibleForTesting
- static final String INVALID_STREAM_URI_ERROR = "Invalid stream URI";
- @VisibleForTesting
- static final String MISSING_PARAMS_ERROR = "Both stream encoding and stream name are required.";
- @VisibleForTesting
- static final String UNKNOWN_STREAM_ERROR = "Requested stream does not exist";
-
- private static final int ERROR_BUF_SIZE = 2048;
-
- private final RestconfStream.Registry streamRegistry;
- private final String basePath;
- private final ErrorTagMapping errorTagMapping;
- private final RestconfStream.EncodingName defaultEncoding;
- private final PrettyPrintParam defaultPrettyPrint;
-
- public RestconfStreamService(final RestconfStream.Registry registry, final String restconf,
- final ErrorTagMapping errorTagMapping, final MessageEncoding defaultEncoding,
- final PrettyPrintParam defaultPrettyPrint) {
- streamRegistry = requireNonNull(registry);
- basePath = requireNonNull(restconf);
- this.defaultEncoding = defaultEncoding.streamEncodingName();
- this.errorTagMapping = errorTagMapping;
- this.defaultPrettyPrint = defaultPrettyPrint;
- }
-
- @Override
- public void startEventStream(final @NonNull String requestUri, final @NonNull EventStreamListener listener,
- final @NonNull StartCallback callback) {
- // parse URI.
- // pattern /basePath/streams/streamEncoding/streamName
- final var decoder = new QueryStringDecoder(requestUri);
- final var pathParams = PathParameters.from(decoder.path(), basePath);
- if (!PathParameters.STREAMS.equals(pathParams.apiResource())) {
- callback.onStartFailure(errorResponse(ErrorTag.DATA_MISSING, INVALID_STREAM_URI_ERROR, defaultEncoding));
- return;
- }
- final var args = pathParams.childIdentifier().split("/", 2);
- final var streamEncoding = encoding(args[0]);
- final var streamName = args.length > 1 ? args[1] : null;
- if (streamEncoding == null || streamName == null || streamName.isEmpty()) {
- callback.onStartFailure(errorResponse(ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR,
- streamEncoding == null ? defaultEncoding : streamEncoding));
- return;
- }
-
- // find stream by name
- final var stream = streamRegistry.lookupStream(streamName);
- if (stream == null) {
- callback.onStartFailure(errorResponse(ErrorTag.DATA_MISSING, UNKNOWN_STREAM_ERROR, streamEncoding));
- return;
- }
-
- // Try starting stream via registry stream subscriber
- final var sender = new RestconfStream.Sender() {
- @Override
- public void sendDataMessage(final String data) {
- listener.onEventField("data", data);
- }
-
- @Override
- public void endOfStream() {
- listener.onStreamEnd();
- }
- };
- final var streamParams = EventStreamGetParams.of(QueryParameters.ofMultiValue(decoder.parameters()));
- try {
- final var registration = stream.addSubscriber(sender, streamEncoding, streamParams);
- if (registration != null) {
- callback.onStreamStarted(registration::close);
- } else {
- callback.onStartFailure(errorResponse(ErrorTag.DATA_MISSING, UNKNOWN_STREAM_ERROR, streamEncoding));
- }
- } catch (UnsupportedEncodingException | XPathExpressionException | IllegalArgumentException e) {
- callback.onStartFailure(errorResponse(ErrorTag.BAD_ATTRIBUTE, e.getMessage(), streamEncoding));
- }
- }
-
- private static RestconfStream.EncodingName encoding(final String encodingName) {
- try {
- return new RestconfStream.EncodingName(encodingName);
- } catch (IllegalArgumentException e) {
- LOG.warn("Stream encoding name '{}' is invalid: {}. Ignored.", encodingName, e.getMessage());
- return null;
- }
- }
-
- private Exception errorResponse(final ErrorTag errorTag, final String errorMessage,
- final RestconfStream.EncodingName encoding) {
- final var yangErrorsBody =
- new YangErrorsBody(List.of(new ServerError(ErrorType.PROTOCOL, errorTag, errorMessage)));
- final var statusCode = errorTagMapping.statusOf(errorTag).code();
- try (var out = new ByteArrayOutputStream(ERROR_BUF_SIZE)) {
- if (RestconfStream.EncodingName.RFC8040_JSON.equals(encoding)) {
- yangErrorsBody.formatToJSON(defaultPrettyPrint, out);
- return new ErrorResponseException(statusCode, out.toString(StandardCharsets.UTF_8),
- NettyMediaTypes.APPLICATION_YANG_DATA_JSON);
- } else {
- yangErrorsBody.formatToXML(defaultPrettyPrint, out);
- return new ErrorResponseException(statusCode, out.toString(StandardCharsets.UTF_8),
- NettyMediaTypes.APPLICATION_YANG_DATA_XML);
- }
- } catch (IOException e) {
- LOG.error("Failure encoding error message", e);
- // return as plain text
- return new IllegalStateException(errorMessage);
- }
- }
-}
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.opendaylight.netconf.transport.api.TransportChannelListener;
import org.opendaylight.netconf.transport.http.HTTPTransportChannel;
-import org.opendaylight.netconf.transport.http.ServerSseHandler;
import org.opendaylight.restconf.server.api.RestconfServer;
import org.opendaylight.restconf.server.spi.RestconfStream;
import org.slf4j.Logger;
root = new EndpointRoot(principalService, new WellKnownResources(restconf), firstSegment,
new APIResource(server, otherSegments, sb.append('/').toString(), configuration.errorTagMapping(),
- configuration.defaultEncoding(), configuration.prettyPrint(), streamRegistry), streamRegistry);
+ configuration.defaultEncoding(), configuration.prettyPrint(),
+ configuration.sseHeartbeatIntervalMillis().intValue(),
+ configuration.sseMaximumFragmentLength().intValue(), streamRegistry), streamRegistry);
LOG.info("Initialized with service {}", server.getClass());
LOG.info("Initialized with base path: {}, default encoding: {}, default pretty print: {}", restconf,
public void onTransportChannelEstablished(final HTTPTransportChannel channel) {
final var session = new RestconfSession(channel.scheme(), root);
final var nettyChannel = channel.channel();
- nettyChannel.pipeline().addLast(
- new ServerSseHandler(
- new RestconfStreamService(streamRegistry, restconf, configuration.errorTagMapping(),
- configuration.defaultEncoding(), configuration.prettyPrint()),
- configuration.sseMaximumFragmentLength().toJava(), configuration.sseHeartbeatIntervalMillis().toJava()),
- session);
+ nettyChannel.pipeline().addLast(session);
}
@Override
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.netconf.transport.http.EmptyResponse;
+import org.opendaylight.netconf.transport.http.EventStreamResponse;
import org.opendaylight.netconf.transport.http.HeadersResponse;
import org.opendaylight.netconf.transport.http.ImplementedMethod;
import org.opendaylight.netconf.transport.http.PreparedRequest;
private static final AsciiString STREAM_ID = HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text();
private final RestconfStream.Registry streamRegistry;
+ private final int sseHeartbeatIntervalMillis;
+ private final int sseMaximumFragmentLength;
private final Map<Integer, Registration> senders = new HashMap<>();
- StreamsResource(final EndpointInvariants invariants, final RestconfStream.Registry streamRegistry) {
+ StreamsResource(final EndpointInvariants invariants, final RestconfStream.Registry streamRegistry,
+ final int sseHeartbeatIntervalMillis, final int sseMaximumFragmentLength) {
super(invariants);
this.streamRegistry = requireNonNull(streamRegistry);
+ this.sseHeartbeatIntervalMillis = sseHeartbeatIntervalMillis;
+ this.sseMaximumFragmentLength = sseMaximumFragmentLength;
}
@Override
return EmptyResponse.NOT_FOUND;
}
- if (headers.contains(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_EVENT_STREAM, false)) {
+ if (!headers.contains(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_EVENT_STREAM, false)) {
return new EmptyResponse(HttpResponseStatus.NOT_ACCEPTABLE);
}
// not be servicing any other requests.
private PreparedRequest switchToEventStream(final RestconfStream<?> stream,
final RestconfStream.EncodingName encoding, final EventStreamGetParams params) {
- final var sender = new ChannelSender();
+ final var sender = new ChannelSender(sseMaximumFragmentLength);
final Registration registration = registerSender(stream, encoding, params, sender);
if (registration == null) {
return EmptyResponse.NOT_FOUND;
}
- final var ctx = sender.getCtx();
- // Replace ourselves with the sender and enable it wil the registration
- ctx.channel().pipeline().replace(ctx.handler(), null, sender);
sender.enable(registration);
- return EmptyResponse.OK;
+ return new EventStreamResponse(HttpResponseStatus.OK, sender, sseHeartbeatIntervalMillis);
}
// HTTP/2 event stream start.
private PreparedRequest addEventStream(final Integer streamId, final RestconfStream<?> stream,
- final RestconfStream.EncodingName encoding, final EventStreamGetParams params) {
+ final RestconfStream.EncodingName encoding, final EventStreamGetParams params) {
final var sender = new StreamSender(streamId);
final var registration = registerSender(stream, encoding, params, sender);
if (registration == null) {
session = new RestconfSession(HTTPScheme.HTTP,
new EndpointRoot(principalService, WELL_KNOWN, BASE_PATH.substring(1),
new APIResource(server, List.of(), "/rests/", ERROR_TAG_MAPPING, MessageEncoding.JSON, PRETTY_PRINT,
- streamRegistry), streamRegistry));
+ 1000, Integer.MAX_VALUE, streamRegistry), streamRegistry));
doReturn(channel).when(ctx).channel();
doReturn(new InetSocketAddress(0)).when(channel).remoteAddress();
session.handlerAdded(ctx);
doReturn(channel).when(ctx).channel();
doReturn(new InetSocketAddress(0)).when(channel).remoteAddress();
doReturn(pipeline).when(channel).pipeline();
- doReturn(pipeline).when(pipeline).addLast(any(ChannelHandler.class), any());
+ doReturn(pipeline).when(pipeline).addLast(any(ChannelHandler.class));
doReturn(HTTPScheme.HTTP).when(transportChannel).scheme();
// default config just for testing purposes
final var configuration = new NettyEndpointConfiguration(ErrorTagMapping.RFC8040, PrettyPrintParam.TRUE,
configuration);
listener.onTransportChannelEstablished(transportChannel);
// capture created session
- verify(pipeline).addLast(any(ChannelHandler.class), sessionCaptor.capture());
+ verify(pipeline).addLast(sessionCaptor.capture());
final var session = sessionCaptor.getValue();
session.handlerAdded(ctx);
+++ /dev/null
-/*
- * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.server;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.restconf.server.PathParameters.STREAMS;
-import static org.opendaylight.restconf.server.RestconfStreamService.INVALID_STREAM_URI_ERROR;
-import static org.opendaylight.restconf.server.RestconfStreamService.MISSING_PARAMS_ERROR;
-import static org.opendaylight.restconf.server.RestconfStreamService.UNKNOWN_STREAM_ERROR;
-import static org.opendaylight.restconf.server.TestUtils.ERROR_TAG_MAPPING;
-import static org.opendaylight.restconf.server.TestUtils.assertErrorContent;
-
-import io.netty.handler.codec.http.QueryStringEncoder;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Stream;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.opendaylight.netconf.transport.http.ErrorResponseException;
-import org.opendaylight.netconf.transport.http.EventStreamListener;
-import org.opendaylight.netconf.transport.http.EventStreamService;
-import org.opendaylight.restconf.api.QueryParameters;
-import org.opendaylight.restconf.api.query.PrettyPrintParam;
-import org.opendaylight.restconf.api.query.SkipNotificationDataParam;
-import org.opendaylight.restconf.api.query.StartTimeParam;
-import org.opendaylight.restconf.api.query.StopTimeParam;
-import org.opendaylight.restconf.server.TestUtils.TestEncoding;
-import org.opendaylight.restconf.server.api.EventStreamGetParams;
-import org.opendaylight.restconf.server.spi.RestconfStream;
-import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.common.ErrorTag;
-
-@ExtendWith(MockitoExtension.class)
-class RestconfStreamServiceTest {
-
- private static final String BASE_PATH = "/rests";
- private static final String URI_PREFIX = BASE_PATH + STREAMS;
- private static final String URI_TEMPLATE = URI_PREFIX + "/%s/%s";
- private static final String XML = "xml";
- private static final String JSON = "json";
- private static final String VALID_STREAM_NAME = "valid-stream-name";
- private static final String INVALID_STREAM_NAME = "invalid-stream-name";
- private static final String ERROR_MESSAGE = "error-message";
- private static final EventStreamGetParams EMPTY_PARAMS = EventStreamGetParams.of(QueryParameters.of());
-
- @Mock
- private RestconfStream.Registry registry;
- @Mock
- private RestconfStream<?> stream;
- @Mock
- private EventStreamListener listener;
- @Mock
- private EventStreamService.StartCallback callback;
- @Mock
- private Registration registration;
-
- @Captor
- private ArgumentCaptor<Exception> exceptionCaptor;
- @Captor
- private ArgumentCaptor<EventStreamService.StreamControl> controlCaptor;
- @Captor
- private ArgumentCaptor<EventStreamGetParams> getParamsCaptor;
- @Captor
- private ArgumentCaptor<RestconfStream.Sender> senderCaptor;
-
- private RestconfStreamService streamService;
-
- @BeforeEach
- void beforeEach() {
- streamService = new RestconfStreamService(registry, BASE_PATH, ERROR_TAG_MAPPING, MessageEncoding.JSON,
- PrettyPrintParam.FALSE);
- }
-
- @ParameterizedTest
- @MethodSource
- void uriParseFailure(final String uri, final String expectedFormat, final ErrorTag expectedErrorTag,
- final String expectedMessage) {
- streamService.startEventStream(uri, listener, callback);
- verify(callback).onStartFailure(exceptionCaptor.capture());
- assertErrorResponseException(exceptionCaptor.getValue(), expectedFormat, expectedErrorTag, expectedMessage);
- }
-
- private static Stream<Arguments> uriParseFailure() {
- return Stream.of(
- // uri, expectedFormat, expectedErrorTag, expectedMessage
- Arguments.of("/", JSON, ErrorTag.DATA_MISSING, INVALID_STREAM_URI_ERROR),
- Arguments.of("/smth", JSON, ErrorTag.DATA_MISSING, INVALID_STREAM_URI_ERROR),
- Arguments.of("/rests", JSON, ErrorTag.DATA_MISSING, INVALID_STREAM_URI_ERROR),
- Arguments.of(URI_PREFIX, JSON, ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR),
- Arguments.of(URI_PREFIX + "/..", JSON, ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR),
- Arguments.of(URI_PREFIX + "/" + JSON, JSON, ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR),
- Arguments.of(URI_PREFIX + "/" + XML, XML, ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR),
- Arguments.of(URI_PREFIX + "/" + XML + "/", XML, ErrorTag.BAD_ATTRIBUTE, MISSING_PARAMS_ERROR)
- );
- }
-
- @ParameterizedTest
- @ValueSource(strings = {XML, JSON})
- void streamNotFoundFailure(final String encoding) {
- doReturn(null).when(registry).lookupStream(INVALID_STREAM_NAME);
- final var uri = URI_TEMPLATE.formatted(encoding, INVALID_STREAM_NAME);
- streamService.startEventStream(uri, listener, callback);
- verify(callback).onStartFailure(exceptionCaptor.capture());
- assertErrorResponseException(exceptionCaptor.getValue(), encoding, ErrorTag.DATA_MISSING, UNKNOWN_STREAM_ERROR);
- }
-
- @ParameterizedTest
- @ValueSource(strings = {XML, JSON})
- void streamSubscribeFailureWithException(final String encoding) throws Exception {
- final var encodingName = new RestconfStream.EncodingName(encoding);
- doReturn(stream).when(registry).lookupStream(VALID_STREAM_NAME);
- doThrow(new IllegalArgumentException(ERROR_MESSAGE))
- .when(stream).addSubscriber(any(RestconfStream.Sender.class), eq(encodingName), eq(EMPTY_PARAMS));
-
- final var uri = URI_TEMPLATE.formatted(encoding, VALID_STREAM_NAME);
- streamService.startEventStream(uri, listener, callback);
- verify(callback).onStartFailure(exceptionCaptor.capture());
- assertErrorResponseException(exceptionCaptor.getValue(), encoding, ErrorTag.BAD_ATTRIBUTE, ERROR_MESSAGE);
- }
-
- @ParameterizedTest
- @ValueSource(strings = {XML, JSON})
- void streamSubscribeFailureWithoutException(final String encoding) throws Exception {
- final var encodingName = new RestconfStream.EncodingName(encoding);
- doReturn(stream).when(registry).lookupStream(VALID_STREAM_NAME);
- doReturn(null).when(stream).addSubscriber(any(RestconfStream.Sender.class),
- eq(encodingName), eq(EMPTY_PARAMS));
-
- final var uri = URI_TEMPLATE.formatted(encoding, VALID_STREAM_NAME);
- streamService.startEventStream(uri, listener, callback);
- verify(callback).onStartFailure(exceptionCaptor.capture());
- assertErrorResponseException(exceptionCaptor.getValue(), encoding, ErrorTag.DATA_MISSING, UNKNOWN_STREAM_ERROR);
- }
-
- private static void assertErrorResponseException(final Exception caught, final String expectedFormat,
- final ErrorTag expectedErrorTag, final String expectedMessage) {
- final var exception = assertInstanceOf(ErrorResponseException.class, caught);
- final var expectedStatusCode = ERROR_TAG_MAPPING.statusOf(expectedErrorTag).code();
- assertEquals(expectedStatusCode, exception.statusCode());
- final var encoding = JSON.equals(expectedFormat) ? TestEncoding.JSON : TestEncoding.XML;
- assertEquals(encoding.responseType.toString(), exception.contentType());
- assertErrorContent(exception.getMessage(), encoding, expectedErrorTag, expectedMessage);
- }
-
- @ParameterizedTest
- @MethodSource
- void subscribeSuccess(final String encoding, final Map<String, List<String>> queryParams) throws Exception {
- final var encodingName = new RestconfStream.EncodingName(encoding);
- doReturn(stream).when(registry).lookupStream(VALID_STREAM_NAME);
- doReturn(registration).when(stream).addSubscriber(any(), eq(encodingName), any(EventStreamGetParams.class));
-
- // build uri with query parameters
- final var uriEncoder = new QueryStringEncoder(URI_TEMPLATE.formatted(encoding, VALID_STREAM_NAME),
- Charset.defaultCharset());
- queryParams.forEach((name, list) -> {
- if (!list.isEmpty()) {
- uriEncoder.addParam(name, list.get(0));
- }
- });
- streamService.startEventStream(uriEncoder.toString(), listener, callback);
- verify(stream).addSubscriber(senderCaptor.capture(), eq(encodingName), getParamsCaptor.capture());
-
- // verify params passed
- final var params = getParamsCaptor.getValue();
- assertNotNull(params);
- if (!queryParams.isEmpty()) {
- final var expected = EventStreamGetParams.of(QueryParameters.ofMultiValue(queryParams));
- assertNotNull(params.startTime());
- assertNotNull(params.stopTime());
- assertNotNull(params.skipNotificationData());
- assertEquals(expected.startTime().paramValue(), params.startTime().paramValue());
- assertEquals(expected.stopTime().paramValue(), params.stopTime().paramValue());
- assertEquals(expected.skipNotificationData().paramValue(), params.skipNotificationData().paramValue());
- }
-
- // verify sender object passed is linked to listener
- final var sender = senderCaptor.getValue();
- assertNotNull(sender);
- sender.sendDataMessage("test");
- verify(listener, times(1)).onEventField("data", "test");
- sender.endOfStream();
- verify(listener, times(1)).onStreamEnd();
-
- // verify returned control object terminates subscription registration
- verify(callback).onStreamStarted(controlCaptor.capture());
- final var control = controlCaptor.getValue();
- assertNotNull(control);
- control.close();
- verify(registration, times(1)).close();
- }
-
- private static Stream<Arguments> subscribeSuccess() {
- return Stream.of(
- Arguments.of(JSON, Map.of()),
- Arguments.of(XML, Map.of(
- StartTimeParam.uriName, List.of("2024-08-05T09:00:00Z"),
- StopTimeParam.uriName, List.of("2024-08-05T18:00:00Z"),
- SkipNotificationDataParam.uriName, List.of("true")))
- );
- }
-}
}
@Override
- public void startEventStream(final String requestUri, final EventStreamListener listener,
+ public void startEventStream(final String host, final String requestUri, final EventStreamListener listener,
final StartCallback callback) {
if (!channel.isActive()) {
callback.onStartFailure(new IllegalStateException("Connection is closed"));
request.headers()
.set(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_EVENT_STREAM)
.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
- .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
+ .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED)
+ .set(HttpHeaderNames.HOST, host);
channel.writeAndFlush(request);
LOG.debug("SSE request sent to {}", requestUri);
}
}
@Override
- public void startEventStream(final String requestUri, final EventStreamListener listener,
+ public void startEventStream(final String host, final String requestUri, final EventStreamListener listener,
final StartCallback callback) {
if (!channel.isActive()) {
callback.onStartFailure(new IllegalStateException("Connection is closed"));
--- /dev/null
+/*
+ * Copyright (c) 2025 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.transport.http;
+
+import static io.netty.handler.codec.http2.HttpConversionUtil.ExtensionHeaderNames.STREAM_ID;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.DefaultHttpContent;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http.HttpMessage;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import java.util.concurrent.TimeUnit;
+
+public final class EventStreamResponse implements Response {
+ private static final ByteBuf PING_MESSAGE =
+ Unpooled.wrappedBuffer(new byte[] { ':', 'p', 'i', 'n', 'g', '\r', '\n', '\r', '\n' }).asReadOnly();
+ private static final String CHANNEL_SENDER_CONTEXT_NAME = "ChannelSenderContext";
+
+ private final ChannelHandler sender;
+ private final HttpResponseStatus status;
+ private final int sseHeartbeatIntervalMillis;
+
+ private ChannelHandlerContext context = null;
+
+ public EventStreamResponse(final HttpResponseStatus status, final ChannelHandler sender,
+ int sseHeartbeatIntervalMillis) {
+ this.status = status;
+ this.sender = sender;
+ this.sseHeartbeatIntervalMillis = sseHeartbeatIntervalMillis;
+ }
+
+ @Override
+ public HttpResponseStatus status() {
+ return status;
+ }
+
+ public DefaultHttpResponse start(final ChannelHandlerContext ctx, final Integer streamId,
+ final HttpVersion version) {
+ // Replace handler with the sender and get new context
+ context = ctx.channel().pipeline().replace(ctx.handler(), CHANNEL_SENDER_CONTEXT_NAME, sender)
+ .context(CHANNEL_SENDER_CONTEXT_NAME);
+
+ final var response = new DefaultHttpResponse(version, HttpResponseStatus.OK);
+ response.headers()
+ .set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_EVENT_STREAM)
+ .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
+ .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
+ copyStreamId(streamId, response);
+
+ if (sseHeartbeatIntervalMillis > 0) {
+ schedulePing();
+ }
+ return response;
+ }
+
+ private void schedulePing() {
+ context.executor().schedule(this::sendPing, sseHeartbeatIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+
+ private void sendPing() {
+ if (isChannelWritable()) {
+ context.writeAndFlush(new DefaultHttpContent(PING_MESSAGE.retainedSlice()));
+ schedulePing();
+ }
+ }
+
+ private boolean isChannelWritable() {
+ return context != null && !context.isRemoved() && context.channel().isActive();
+ }
+
+ static void copyStreamId(final Integer streamId, final HttpMessage to) {
+ if (streamId != null) {
+ to.headers().setInt(STREAM_ID.text(), streamId);
+ }
+ }
+}
* <p>If request is declined then {@link StartCallback#onStartFailure(Exception)} method invoked with
* {@link Exception} describing the decline reason.
*
+ * @param host host to be used in headers of stream request
* @param requestUri stream request URI
* @param listener SSE event consumer
* @param callback SSE stream request callback
*/
- void startEventStream(String requestUri, EventStreamListener listener, StartCallback callback);
+ void startEventStream(String host, String requestUri, EventStreamListener listener, StartCallback callback);
/**
* Invoked when the request to attach to an event stream finishes.
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
@NonNullByDefault
static final void respond(final ChannelHandlerContext ctx, final @Nullable Integer streamId,
- final FullHttpResponse response) {
+ final HttpResponse response) {
requireNonNull(response);
if (streamId != null) {
response.headers().setInt(STREAM_ID, streamId);
* {@link #asResponse()}. It has a {@link #status()} and can be turned into (a series of) HTTP codec objects.
*/
@NonNullByDefault
-public sealed interface Response extends CompletedRequest permits ReadyResponse, FiniteResponse {
+public sealed interface Response extends CompletedRequest permits ReadyResponse, FiniteResponse, EventStreamResponse {
@Override
default Response asResponse() {
return this;
switch (response) {
case ReadyResponse resp -> HTTPServerSession.respond(ctx, streamId, resp.toHttpResponse(version));
case FiniteResponse resp -> respond(ctx, streamId, version, resp);
+ case EventStreamResponse resp -> HTTPServerSession.respond(ctx, streamId,
+ resp.start(ctx, streamId, version));
}
}
+++ /dev/null
-/*
- * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.transport.http;
-
-import static io.netty.handler.codec.http2.HttpConversionUtil.ExtensionHeaderNames.STREAM_ID;
-import static java.util.Objects.requireNonNull;
-import static org.opendaylight.netconf.transport.http.SseUtils.chunksOf;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.DefaultHttpContent;
-import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.DefaultLastHttpContent;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpHeaderValues;
-import io.netty.handler.codec.http.HttpMessage;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.LastHttpContent;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.netconf.transport.http.EventStreamService.StartCallback;
-import org.opendaylight.netconf.transport.http.EventStreamService.StreamControl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Server side Server-Sent Event (SSE) Handler.
- *
- * <p>Intercepts GET requests with {@code accept=text/event-stream} header, invokes the
- * {@link EventStreamService#startEventStream(String, EventStreamListener, StartCallback)} using
- * request {@code URI} as parameter. If request is accepted the handler starts an event stream as
- * {@code transfer-encoding=chunked} response: headers are sent immediately, body chunks are sent on
- * service events.
- *
- * <p>If request is not accepted then error response will be returned with error message as response body.
- * If decline exception is an instance of {@link ErrorResponseException} then explicitly defined
- * {@code content-type} value and response status code will be used in error response header.
- *
- * <p>@deprecated This class will be removed in next release and replaced by dedicated {@code AbstractResource}
- * handling RFC 8040 streams endpoint.
- */
-@Deprecated(forRemoval = true)
-public final class ServerSseHandler extends ChannelInboundHandlerAdapter implements EventStreamListener {
- private static final Logger LOG = LoggerFactory.getLogger(ServerSseHandler.class);
- private static final ByteBuf PING_MESSAGE =
- Unpooled.wrappedBuffer(new byte[] { ':', 'p', 'i', 'n', 'g', '\r', '\n', '\r', '\n' }).asReadOnly();
- private static final ByteBuf EMPTY_LINE = Unpooled.wrappedBuffer(new byte[] { '\r', '\n' }).asReadOnly();
-
- private final int maxFieldValueLength;
- private final long heartbeatIntervalMillis;
- private final EventStreamService service;
-
- private ChannelHandlerContext context;
- private StreamControl eventStream;
- private boolean streaming;
-
- /**
- * Default constructor.
- *
- * @param service the event stream service instance
- * @param maxFieldValueLength max length of event message in chars, if parameter value is greater than zero and
- * message length exceeds the limit then message will split to sequence of shorter messages;
- * if parameter value is zero or less, then message length won't be checked
- * @param heartbeatIntervalMillis the keep-alive ping message interval in milliseconds, if set to zero or less
- * no ping message will be sent by server
- */
- public ServerSseHandler(final EventStreamService service, final int maxFieldValueLength,
- final long heartbeatIntervalMillis) {
- this.service = requireNonNull(service);
- this.maxFieldValueLength = maxFieldValueLength;
- this.heartbeatIntervalMillis = heartbeatIntervalMillis;
- }
-
- @Override
- public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
- context = ctx;
- ctx.channel().closeFuture().addListener(ignored -> unregister());
- LOG.debug("Server SSE enabled on channel {}", context.channel());
- }
-
- @Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
- switch (msg) {
- case FullHttpRequest request -> channelRead(ctx, request);
- default -> super.channelRead(ctx, msg);
- }
- }
-
- private void channelRead(final ChannelHandlerContext ctx, final FullHttpRequest request) {
- if (streaming) {
- LOG.warn("Ignoring unexpected request while SSE stream is active: {}", request);
- return;
- }
-
- if (HttpMethod.GET.equals(request.method())
- && request.headers().contains(HttpHeaderNames.ACCEPT, HttpHeaderValues.TEXT_EVENT_STREAM, true)) {
-
- service.startEventStream(request.retain().uri(), this, new StartCallback() {
- @Override
- public void onStreamStarted(final StreamControl streamControl) {
- confirmEventStreamRequest(request, streamControl);
- request.release();
- }
-
- @Override
- public void onStartFailure(final Exception exception) {
- declineEventStreamRequest(request, exception);
- request.release();
- }
- });
- return;
- }
-
- // pass request to next handler
- ctx.fireChannelRead(request);
- }
-
- private void confirmEventStreamRequest(final FullHttpRequest request, final StreamControl startedStream) {
- eventStream = startedStream;
- streaming = true;
- // response OK with headers only, body chunks will be an event stream
- final var response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);
- response.headers()
- .set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_EVENT_STREAM)
- .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE)
- .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
- copyStreamId(request, response);
- context.writeAndFlush(response);
- // schedule keep-alive events (no action required SSE 'ping' comment) if necessary
- if (heartbeatIntervalMillis > 0) {
- schedulePing();
- }
- LOG.debug("Event Stream request accepted for URI={}", request.uri());
- }
-
- private void declineEventStreamRequest(final FullHttpRequest request, final Throwable exception) {
- final var errorMessage = exception.getMessage();
- LOG.debug("Event Stream request declined for URI={} -> {}", request.uri(), errorMessage);
- final HttpResponseStatus status;
- final CharSequence contentType;
- if (exception instanceof ErrorResponseException errorResponse) {
- status = HttpResponseStatus.valueOf(errorResponse.statusCode());
- contentType = errorResponse.contentType();
- } else {
- status = HttpResponseStatus.BAD_REQUEST;
- contentType = HttpHeaderValues.TEXT_PLAIN;
- }
- final var response = new DefaultFullHttpResponse(request.protocolVersion(), status,
- Unpooled.wrappedBuffer(errorMessage.getBytes(StandardCharsets.UTF_8)));
- response.headers().set(HttpHeaderNames.CONTENT_TYPE, contentType)
- .setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
- copyStreamId(request, response);
- context.writeAndFlush(response);
- }
-
- @Override
- public void onEventField(final String fieldName, final String fieldValue) {
- if (isChannelWritable()) {
- chunksOf(fieldName, fieldValue, maxFieldValueLength, context.alloc())
- .forEach(chunk -> context.writeAndFlush(new DefaultHttpContent(chunk)));
- context.writeAndFlush(new DefaultHttpContent(EMPTY_LINE.retainedSlice()));
- }
- }
-
- @Override
- public void onStreamStart() {
- // noop
- }
-
- @Override
- public void onStreamEnd() {
- onStreamEnd(new DefaultLastHttpContent());
- }
-
- private void onStreamEnd(final LastHttpContent lastContent) {
- if (isChannelWritable() && streaming) {
- context.writeAndFlush(lastContent);
- }
- unregister();
- streaming = false;
- }
-
- private void schedulePing() {
- context.executor().schedule(this::sendPing, heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
- }
-
- private void sendPing() {
- if (isChannelWritable() && streaming) {
- context.writeAndFlush(new DefaultHttpContent(PING_MESSAGE.retainedSlice()));
- schedulePing();
- }
- }
-
- private boolean isChannelWritable() {
- return context != null && !context.isRemoved() && context.channel().isActive();
- }
-
- private void unregister() {
- if (eventStream != null) {
- eventStream.close();
- }
- }
-
- /**
- * Copies HTTP/2 associated stream id value (if exists) from one HTTP 1.1 message to another.
- *
- * @param from the message object to copy value from
- * @param to the message object to copy value to
- */
- static void copyStreamId(final HttpMessage from, final HttpMessage to) {
- final var streamId = from.headers().getInt(STREAM_ID.text());
- if (streamId != null) {
- to.headers().setInt(STREAM_ID.text(), streamId);
- }
- }
-
-}
* @param allocator the {@link ByteBufAllocator} instance used to allocate space for binary data
* @return list of {@link ByteBuf}
*/
- static List<ByteBuf> chunksOf(final String fieldName, final String fieldValue, final int maxValueLength,
+ public static List<ByteBuf> chunksOf(final String fieldName, final String fieldValue, final int maxValueLength,
final ByteBufAllocator allocator) {
final var valueStr = CRLF_MATCHER.removeFrom(requireNonNull(fieldValue));
final var valueLen = valueStr.length();
+++ /dev/null
-/*
- * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.netconf.transport.http;
-
-import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
-import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
-import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
-import static io.netty.handler.codec.http.HttpMethod.GET;
-import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-import static org.awaitility.Awaitility.await;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.opendaylight.netconf.transport.http.ConfigUtils.clientTransportTcp;
-import static org.opendaylight.netconf.transport.http.ConfigUtils.clientTransportTls;
-import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTcp;
-import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTls;
-import static org.opendaylight.netconf.transport.http.TestUtils.freePort;
-import static org.opendaylight.netconf.transport.http.TestUtils.generateX509CertData;
-import static org.opendaylight.netconf.transport.http.TestUtils.invoke;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
-import java.net.InetAddress;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.stream.IntStream;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.opendaylight.netconf.transport.api.TransportChannelListener;
-import org.opendaylight.netconf.transport.http.EventStreamService.StartCallback;
-import org.opendaylight.netconf.transport.tcp.BootstrapFactory;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.http.client.rev240208.HttpClientStackGrouping;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.http.server.rev240208.HttpServerStackGrouping;
-
-@ExtendWith(MockitoExtension.class)
-class SseClientServerTest {
- private static final String USERNAME = "username";
- private static final String PASSWORD = "pa$$W0rd";
- private static final Map<String, String> USER_HASHES_MAP = Map.of(USERNAME, "$0$" + PASSWORD);
- private static final String DATA_URI = "/data";
- private static final String STREAM_URI = "/stream";
- private static final ByteBuf OK_CONTENT = Unpooled.wrappedBuffer("OK".getBytes(StandardCharsets.UTF_8));
- private static final String DECLINE_MESSAGE = "decline-message";
- private static final String DATA = "data";
- private static final List<String> DATA_VALUES = IntStream.rangeClosed(1, 10)
- .mapToObj(num -> "value " + num).toList();
-
- private static BootstrapFactory bootstrapFactory;
- private static String localAddress;
-
- @Mock
- private HttpServerStackGrouping serverConfig;
- @Mock
- private HttpClientStackGrouping clientConfig;
- @Mock
- private EventStreamListener eventStreamListener;
- @Mock
- private StartCallback startCallback;
- @Captor
- private ArgumentCaptor<Exception> exceptionCaptor;
-
- private EventStreamService clientEventStreamService;
- private TestStreamService serverEventStreamService;
- private TestTransportListener serverTransportListener;
- private TestTransportListener clientTransportListener;
-
- @BeforeAll
- static void beforeAll() {
- bootstrapFactory = new BootstrapFactory("IntegrationTest", 0);
- localAddress = InetAddress.getLoopbackAddress().getHostAddress();
- }
-
- @AfterAll
- static void afterAll() {
- bootstrapFactory.close();
- }
-
- @BeforeEach
- void beforeEach() {
- clientEventStreamService = null;
- serverEventStreamService = new TestStreamService();
- // init SSE layer on top of HTTP layer using Transport channel listeners
- serverTransportListener = new TestTransportListener(channel -> {
- channel.channel().pipeline().addLast(
- new ServerSseHandler(serverEventStreamService, 0, 0),
- new SimpleChannelInboundHandler<>(FullHttpRequest.class) {
- @Override
- protected void channelRead0(final ChannelHandlerContext ctx, final FullHttpRequest msg) {
- final var response = DATA_URI.equals(msg.uri())
- ? new DefaultFullHttpResponse(msg.protocolVersion(), OK, OK_CONTENT.copy())
- : new DefaultFullHttpResponse(msg.protocolVersion(), NOT_FOUND, Unpooled.EMPTY_BUFFER);
- response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
- ServerSseHandler.copyStreamId(msg, response);
- ctx.writeAndFlush(response);
- }
- });
- });
- clientTransportListener = new TestTransportListener(channel ->
- clientEventStreamService = SseUtils.enableClientSse(channel));
- }
-
- @ParameterizedTest(name = "TCP with no authorization, HTTP/2: {0}")
- @ValueSource(booleans = {false, true})
- void noAuthTcp(final boolean http2) throws Exception {
- final var localPort = freePort();
- doReturn(serverTransportTcp(localAddress, localPort)).when(serverConfig).getTransport();
- doReturn(clientTransportTcp(localAddress, localPort)).when(clientConfig).getTransport();
- integrationTest(http2);
- }
-
- @ParameterizedTest(name = "TCP with Basic authorization, HTTP/2: {0}")
- @ValueSource(booleans = {false, true})
- void basicAuthTcp(final boolean http2) throws Exception {
- final var localPort = freePort();
- doReturn(serverTransportTcp(localAddress, localPort, USER_HASHES_MAP))
- .when(serverConfig).getTransport();
- doReturn(clientTransportTcp(localAddress, localPort, USERNAME, PASSWORD))
- .when(clientConfig).getTransport();
- integrationTest(http2);
- }
-
- @ParameterizedTest(name = "TLS with no authorization, HTTP/2: {0}")
- @ValueSource(booleans = {false, true})
- void noAuthTls(final boolean http2) throws Exception {
- final var certData = generateX509CertData("RSA");
- final var localPort = freePort();
- doReturn(serverTransportTls(localAddress, localPort, certData.certificate(), certData.privateKey()))
- .when(serverConfig).getTransport();
- doReturn(clientTransportTls(localAddress, localPort, certData.certificate())).when(clientConfig).getTransport();
- integrationTest(http2);
- }
-
- @ParameterizedTest(name = "TLS with Basic authorization, HTTP/2: {0}")
- @ValueSource(booleans = {false, true})
- void basicAuthTls(final boolean http2) throws Exception {
- final var certData = generateX509CertData("RSA");
- final var localPort = freePort();
- doReturn(serverTransportTls(localAddress, localPort, certData.certificate(), certData.privateKey(),
- USER_HASHES_MAP)).when(serverConfig).getTransport();
- doReturn(clientTransportTls(localAddress, localPort, certData.certificate(), USERNAME, PASSWORD))
- .when(clientConfig).getTransport();
- integrationTest(http2);
- }
-
- private void integrationTest(final boolean http2) throws Exception {
- final var server = HTTPServer.listen(serverTransportListener, bootstrapFactory.newServerBootstrap(),
- serverConfig).get(2, TimeUnit.SECONDS);
- try {
- final var client = HTTPClient.connect(clientTransportListener, bootstrapFactory.newBootstrap(),
- clientConfig, http2).get(2, TimeUnit.SECONDS);
- try {
- await().atMost(Duration.ofSeconds(2)).until(() -> serverTransportListener.initialized);
- await().atMost(Duration.ofSeconds(2)).until(() -> clientTransportListener.initialized);
- assertNotNull(clientEventStreamService);
-
- // verify HTTP request/response works over current connection
- assertGetRequest(client);
-
- // request SSE with invalid URI
- clientEventStreamService.startEventStream(DATA_URI, eventStreamListener, startCallback);
- verify(startCallback, timeout(1000)).onStartFailure(exceptionCaptor.capture());
- final var exception = exceptionCaptor.getValue();
- assertNotNull(exception);
- assertEquals(DECLINE_MESSAGE, exception.getMessage());
-
- // start SSE stream with proper URI
- clientEventStreamService.startEventStream(STREAM_URI, eventStreamListener, startCallback);
- verify(startCallback, timeout(1000)).onStreamStarted(any());
- verify(eventStreamListener).onStreamStart();
-
- // send series of event fields (name:value pairs)
- assertNotNull(serverEventStreamService.listener);
- for (var value : DATA_VALUES) {
- serverEventStreamService.listener.onEventField(DATA, value);
- verify(eventStreamListener, timeout(1000)).onEventField(DATA, value);
- }
-
- // end stream while keeping connection alive
- serverEventStreamService.listener.onStreamEnd();
- verify(eventStreamListener, timeout(1000)).onStreamEnd();
-
- // verify HTTP request/response works on same connection
- assertGetRequest(client);
-
- } finally {
- client.shutdown().get(2, TimeUnit.SECONDS);
- }
- } finally {
- server.shutdown().get(2, TimeUnit.SECONDS);
- }
- }
-
- private static void assertGetRequest(final HTTPClient client) throws Exception {
- final var request = new DefaultFullHttpRequest(HTTP_1_1, GET, DATA_URI);
- request.headers().set(CONNECTION, KEEP_ALIVE);
- final var response = invoke(client, request).get(2, TimeUnit.SECONDS);
- assertNotNull(response);
- assertEquals(OK, response.status());
- }
-
- private static final class TestStreamService implements EventStreamService {
- private EventStreamListener listener;
-
- @Override
- public void startEventStream(final String requestUri, final EventStreamListener eventListener,
- final StartCallback callback) {
- if (STREAM_URI.equals(requestUri)) {
- // accept stream request
- listener = eventListener;
- callback.onStreamStarted(() -> {
- // no-op
- });
- } else {
- // decline stream request
- callback.onStartFailure(new IllegalStateException(DECLINE_MESSAGE));
- }
- }
- }
-
- private static class TestTransportListener implements TransportChannelListener<HTTPTransportChannel> {
- private final Consumer<HTTPTransportChannel> initializer;
- private volatile boolean initialized;
-
- TestTransportListener(final Consumer<HTTPTransportChannel> initializer) {
- this.initializer = initializer;
- }
-
- @Override
- public void onTransportChannelEstablished(final HTTPTransportChannel channel) {
- initialized = true;
- initializer.accept(channel);
- }
-
- @Override
- public void onTransportChannelFailed(final Throwable cause) {
- throw new IllegalStateException("HTTP connection failure", cause);
- }
- }
-}