2 * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.transport.http;
10 import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
11 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
12 import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
13 import static io.netty.handler.codec.http.HttpMethod.GET;
14 import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
15 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
16 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
17 import static org.awaitility.Awaitility.await;
18 import static org.junit.jupiter.api.Assertions.assertEquals;
19 import static org.junit.jupiter.api.Assertions.assertNotNull;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.Mockito.doReturn;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.verify;
24 import static org.opendaylight.netconf.transport.http.ConfigUtils.clientTransportTcp;
25 import static org.opendaylight.netconf.transport.http.ConfigUtils.clientTransportTls;
26 import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTcp;
27 import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTls;
28 import static org.opendaylight.netconf.transport.http.TestUtils.freePort;
29 import static org.opendaylight.netconf.transport.http.TestUtils.generateX509CertData;
30 import static org.opendaylight.netconf.transport.http.TestUtils.invoke;
32 import io.netty.buffer.ByteBuf;
33 import io.netty.buffer.Unpooled;
34 import io.netty.channel.Channel;
35 import io.netty.handler.codec.http.DefaultFullHttpRequest;
36 import io.netty.handler.codec.http.DefaultFullHttpResponse;
37 import java.net.InetAddress;
38 import java.nio.charset.StandardCharsets;
39 import java.time.Duration;
40 import java.util.List;
42 import java.util.concurrent.TimeUnit;
43 import java.util.function.Consumer;
44 import java.util.stream.IntStream;
45 import org.junit.jupiter.api.AfterAll;
46 import org.junit.jupiter.api.BeforeAll;
47 import org.junit.jupiter.api.BeforeEach;
48 import org.junit.jupiter.api.extension.ExtendWith;
49 import org.junit.jupiter.params.ParameterizedTest;
50 import org.junit.jupiter.params.provider.ValueSource;
51 import org.mockito.ArgumentCaptor;
52 import org.mockito.Captor;
53 import org.mockito.Mock;
54 import org.mockito.junit.jupiter.MockitoExtension;
55 import org.opendaylight.netconf.transport.api.TransportChannel;
56 import org.opendaylight.netconf.transport.api.TransportChannelListener;
57 import org.opendaylight.netconf.transport.http.EventStreamService.StartCallback;
58 import org.opendaylight.netconf.transport.tcp.BootstrapFactory;
59 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.http.client.rev240208.HttpClientStackGrouping;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.http.server.rev240208.HttpServerStackGrouping;
62 @ExtendWith(MockitoExtension.class)
63 class SseClientServerTest {
64 private static final String USERNAME = "username";
65 private static final String PASSWORD = "pa$$W0rd";
66 private static final Map<String, String> USER_HASHES_MAP = Map.of(USERNAME, "$0$" + PASSWORD);
67 private static final String DATA_URI = "/data";
68 private static final String STREAM_URI = "/stream";
69 private static final ByteBuf OK_CONTENT = Unpooled.wrappedBuffer("OK".getBytes(StandardCharsets.UTF_8));
70 private static final String DECLINE_MESSAGE = "decline-message";
71 private static final String DATA = "data";
72 private static final List<String> DATA_VALUES = IntStream.rangeClosed(1, 10)
73 .mapToObj(num -> "value " + num).toList();
75 private static RequestDispatcher requestDispatcher;
76 private static BootstrapFactory bootstrapFactory;
77 private static String localAddress;
80 private HttpServerStackGrouping serverConfig;
82 private HttpClientStackGrouping clientConfig;
84 private EventStreamListener eventStreamListener;
86 private StartCallback startCallback;
88 private ArgumentCaptor<Exception> exceptionCaptor;
90 private EventStreamService clientEventStreamService;
91 private TestStreamService serverEventStreamService;
92 private TestTransportListener serverTransportListener;
93 private TestTransportListener clientTransportListener;
96 static void beforeAll() {
97 bootstrapFactory = new BootstrapFactory("IntegrationTest", 0);
98 localAddress = InetAddress.getLoopbackAddress().getHostAddress();
99 requestDispatcher = (request, callback) -> {
100 final var response = DATA_URI.equals(request.uri())
101 ? new DefaultFullHttpResponse(request.protocolVersion(), OK, OK_CONTENT.copy())
102 : new DefaultFullHttpResponse(request.protocolVersion(), NOT_FOUND, Unpooled.EMPTY_BUFFER);
103 response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
104 callback.onSuccess(response);
109 static void afterAll() {
110 bootstrapFactory.close();
115 clientEventStreamService = null;
116 serverEventStreamService = new TestStreamService();
117 // init SSE layer on top of HTTP layer using Transport channel listeners
118 serverTransportListener = new TestTransportListener(channel ->
119 SseUtils.enableServerSse(channel, serverEventStreamService, 0, 0));
120 clientTransportListener = new TestTransportListener(channel ->
121 SseClientServerTest.this.clientEventStreamService = SseUtils.enableClientSse(channel));
124 @ParameterizedTest(name = "TCP with no authorization, HTTP/2: {0}")
125 @ValueSource(booleans = {false, true})
126 void noAuthTcp(final boolean http2) throws Exception {
127 final var localPort = freePort();
128 doReturn(serverTransportTcp(localAddress, localPort)).when(serverConfig).getTransport();
129 doReturn(clientTransportTcp(localAddress, localPort)).when(clientConfig).getTransport();
130 integrationTest(http2);
133 @ParameterizedTest(name = "TCP with Basic authorization, HTTP/2: {0}")
134 @ValueSource(booleans = {false, true})
135 void basicAuthTcp(final boolean http2) throws Exception {
136 final var localPort = freePort();
137 doReturn(serverTransportTcp(localAddress, localPort, USER_HASHES_MAP))
138 .when(serverConfig).getTransport();
139 doReturn(clientTransportTcp(localAddress, localPort, USERNAME, PASSWORD))
140 .when(clientConfig).getTransport();
141 integrationTest(http2);
144 @ParameterizedTest(name = "TLS with no authorization, HTTP/2: {0}")
145 @ValueSource(booleans = {false, true})
146 void noAuthTls(final boolean http2) throws Exception {
147 final var certData = generateX509CertData("RSA");
148 final var localPort = freePort();
149 doReturn(serverTransportTls(localAddress, localPort, certData.certificate(), certData.privateKey()))
150 .when(serverConfig).getTransport();
151 doReturn(clientTransportTls(localAddress, localPort, certData.certificate())).when(clientConfig).getTransport();
152 integrationTest(http2);
155 @ParameterizedTest(name = "TLS with Basic authorization, HTTP/2: {0}")
156 @ValueSource(booleans = {false, true})
157 void basicAuthTls(final boolean http2) throws Exception {
158 final var certData = generateX509CertData("RSA");
159 final var localPort = freePort();
160 doReturn(serverTransportTls(localAddress, localPort, certData.certificate(), certData.privateKey(),
161 USER_HASHES_MAP)).when(serverConfig).getTransport();
162 doReturn(clientTransportTls(localAddress, localPort, certData.certificate(), USERNAME, PASSWORD))
163 .when(clientConfig).getTransport();
164 integrationTest(http2);
167 private void integrationTest(final boolean http2) throws Exception {
168 final var server = HTTPServer.listen(serverTransportListener, bootstrapFactory.newServerBootstrap(),
169 serverConfig, requestDispatcher).get(2, TimeUnit.SECONDS);
171 final var client = HTTPClient.connect(clientTransportListener, bootstrapFactory.newBootstrap(),
172 clientConfig, http2).get(2, TimeUnit.SECONDS);
174 await().atMost(Duration.ofSeconds(2)).until(() -> serverTransportListener.initialized);
175 await().atMost(Duration.ofSeconds(2)).until(() -> clientTransportListener.initialized);
176 assertNotNull(clientEventStreamService);
178 // verify HTTP request/response works over current connection
179 assertGetRequest(client);
181 // request SSE with invalid URI
182 clientEventStreamService.startEventStream(DATA_URI, eventStreamListener, startCallback);
183 verify(startCallback, timeout(1000)).onStartFailure(exceptionCaptor.capture());
184 final var exception = exceptionCaptor.getValue();
185 assertNotNull(exception);
186 assertEquals(DECLINE_MESSAGE, exception.getMessage());
188 // start SSE stream with proper URI
189 clientEventStreamService.startEventStream(STREAM_URI, eventStreamListener, startCallback);
190 verify(startCallback, timeout(1000)).onStreamStarted(any());
191 verify(eventStreamListener).onStreamStart();
193 // send series of event fields (name:value pairs)
194 assertNotNull(serverEventStreamService.listener);
195 for (var value : DATA_VALUES) {
196 serverEventStreamService.listener.onEventField(DATA, value);
197 verify(eventStreamListener, timeout(1000)).onEventField(DATA, value);
200 // end stream while keeping connection alive
201 serverEventStreamService.listener.onStreamEnd();
202 verify(eventStreamListener, timeout(1000)).onStreamEnd();
204 // verify HTTP request/response works on same connection
205 assertGetRequest(client);
208 client.shutdown().get(2, TimeUnit.SECONDS);
211 server.shutdown().get(2, TimeUnit.SECONDS);
215 private static void assertGetRequest(final HTTPClient client) throws Exception {
216 final var request = new DefaultFullHttpRequest(HTTP_1_1, GET, DATA_URI);
217 request.headers().set(CONNECTION, KEEP_ALIVE);
218 final var response = invoke(client, request).get(2, TimeUnit.SECONDS);
219 assertNotNull(response);
220 assertEquals(OK, response.status());
223 private static final class TestStreamService implements EventStreamService {
224 private EventStreamListener listener;
227 public void startEventStream(final String requestUri, final EventStreamListener eventListener,
228 final StartCallback callback) {
229 if (STREAM_URI.equals(requestUri)) {
230 // accept stream request
231 listener = eventListener;
232 callback.onStreamStarted(() -> {
236 // decline stream request
237 callback.onStartFailure(new IllegalStateException(DECLINE_MESSAGE));
242 private static class TestTransportListener implements TransportChannelListener {
243 private final Consumer<Channel> initializer;
244 private volatile boolean initialized;
246 TestTransportListener(final Consumer<Channel> initializer) {
247 this.initializer = initializer;
251 public void onTransportChannelEstablished(final TransportChannel channel) {
253 initializer.accept(channel.channel());
257 public void onTransportChannelFailed(final Throwable cause) {
258 throw new IllegalStateException("HTTP connection failure", cause);