397202dba213c8c872c3d32a25f569140b196a7d
[netconf.git] /
1 /*
2  * Copyright (c) 2024 PANTHEON.tech s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.transport.http;
9
10 import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
11 import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
12 import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
13 import static io.netty.handler.codec.http.HttpMethod.GET;
14 import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
15 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
16 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
17 import static org.awaitility.Awaitility.await;
18 import static org.junit.jupiter.api.Assertions.assertEquals;
19 import static org.junit.jupiter.api.Assertions.assertNotNull;
20 import static org.mockito.ArgumentMatchers.any;
21 import static org.mockito.Mockito.doReturn;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.verify;
24 import static org.opendaylight.netconf.transport.http.ConfigUtils.clientTransportTcp;
25 import static org.opendaylight.netconf.transport.http.ConfigUtils.clientTransportTls;
26 import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTcp;
27 import static org.opendaylight.netconf.transport.http.ConfigUtils.serverTransportTls;
28 import static org.opendaylight.netconf.transport.http.TestUtils.freePort;
29 import static org.opendaylight.netconf.transport.http.TestUtils.generateX509CertData;
30 import static org.opendaylight.netconf.transport.http.TestUtils.invoke;
31
32 import io.netty.buffer.ByteBuf;
33 import io.netty.buffer.Unpooled;
34 import io.netty.channel.Channel;
35 import io.netty.handler.codec.http.DefaultFullHttpRequest;
36 import io.netty.handler.codec.http.DefaultFullHttpResponse;
37 import java.net.InetAddress;
38 import java.nio.charset.StandardCharsets;
39 import java.time.Duration;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.concurrent.TimeUnit;
43 import java.util.function.Consumer;
44 import java.util.stream.IntStream;
45 import org.junit.jupiter.api.AfterAll;
46 import org.junit.jupiter.api.BeforeAll;
47 import org.junit.jupiter.api.BeforeEach;
48 import org.junit.jupiter.api.extension.ExtendWith;
49 import org.junit.jupiter.params.ParameterizedTest;
50 import org.junit.jupiter.params.provider.ValueSource;
51 import org.mockito.ArgumentCaptor;
52 import org.mockito.Captor;
53 import org.mockito.Mock;
54 import org.mockito.junit.jupiter.MockitoExtension;
55 import org.opendaylight.netconf.transport.api.TransportChannel;
56 import org.opendaylight.netconf.transport.api.TransportChannelListener;
57 import org.opendaylight.netconf.transport.http.EventStreamService.StartCallback;
58 import org.opendaylight.netconf.transport.tcp.BootstrapFactory;
59 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.http.client.rev240208.HttpClientStackGrouping;
60 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.http.server.rev240208.HttpServerStackGrouping;
61
62 @ExtendWith(MockitoExtension.class)
63 class SseClientServerTest {
64     private static final String USERNAME = "username";
65     private static final String PASSWORD = "pa$$W0rd";
66     private static final Map<String, String> USER_HASHES_MAP = Map.of(USERNAME, "$0$" + PASSWORD);
67     private static final String DATA_URI = "/data";
68     private static final String STREAM_URI = "/stream";
69     private static final ByteBuf OK_CONTENT = Unpooled.wrappedBuffer("OK".getBytes(StandardCharsets.UTF_8));
70     private static final String DECLINE_MESSAGE = "decline-message";
71     private static final String DATA = "data";
72     private static final List<String> DATA_VALUES = IntStream.rangeClosed(1, 10)
73         .mapToObj(num -> "value " + num).toList();
74
75     private static RequestDispatcher requestDispatcher;
76     private static BootstrapFactory bootstrapFactory;
77     private static String localAddress;
78
79     @Mock
80     private HttpServerStackGrouping serverConfig;
81     @Mock
82     private HttpClientStackGrouping clientConfig;
83     @Mock
84     private EventStreamListener eventStreamListener;
85     @Mock
86     private StartCallback startCallback;
87     @Captor
88     private ArgumentCaptor<Exception> exceptionCaptor;
89
90     private EventStreamService clientEventStreamService;
91     private TestStreamService serverEventStreamService;
92     private TestTransportListener serverTransportListener;
93     private TestTransportListener clientTransportListener;
94
95     @BeforeAll
96     static void beforeAll() {
97         bootstrapFactory = new BootstrapFactory("IntegrationTest", 0);
98         localAddress = InetAddress.getLoopbackAddress().getHostAddress();
99         requestDispatcher = (request, callback) -> {
100             final var response = DATA_URI.equals(request.uri())
101                 ? new DefaultFullHttpResponse(request.protocolVersion(), OK, OK_CONTENT.copy())
102                 : new DefaultFullHttpResponse(request.protocolVersion(), NOT_FOUND, Unpooled.EMPTY_BUFFER);
103             response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
104             callback.onSuccess(response);
105         };
106     }
107
108     @AfterAll
109     static void afterAll() {
110         bootstrapFactory.close();
111     }
112
113     @BeforeEach
114     void beforeEach() {
115         clientEventStreamService = null;
116         serverEventStreamService = new TestStreamService();
117         // init SSE layer on top of HTTP layer using Transport channel listeners
118         serverTransportListener = new TestTransportListener(channel ->
119             SseUtils.enableServerSse(channel, serverEventStreamService, 0, 0));
120         clientTransportListener = new TestTransportListener(channel ->
121             SseClientServerTest.this.clientEventStreamService = SseUtils.enableClientSse(channel));
122     }
123
124     @ParameterizedTest(name = "TCP with no authorization, HTTP/2: {0}")
125     @ValueSource(booleans = {false, true})
126     void noAuthTcp(final boolean http2) throws Exception {
127         final var localPort = freePort();
128         doReturn(serverTransportTcp(localAddress, localPort)).when(serverConfig).getTransport();
129         doReturn(clientTransportTcp(localAddress, localPort)).when(clientConfig).getTransport();
130         integrationTest(http2);
131     }
132
133     @ParameterizedTest(name = "TCP with Basic authorization, HTTP/2: {0}")
134     @ValueSource(booleans = {false, true})
135     void basicAuthTcp(final boolean http2) throws Exception {
136         final var localPort = freePort();
137         doReturn(serverTransportTcp(localAddress, localPort, USER_HASHES_MAP))
138             .when(serverConfig).getTransport();
139         doReturn(clientTransportTcp(localAddress, localPort, USERNAME, PASSWORD))
140             .when(clientConfig).getTransport();
141         integrationTest(http2);
142     }
143
144     @ParameterizedTest(name = "TLS with no authorization, HTTP/2: {0}")
145     @ValueSource(booleans = {false, true})
146     void noAuthTls(final boolean http2) throws Exception {
147         final var certData = generateX509CertData("RSA");
148         final var localPort = freePort();
149         doReturn(serverTransportTls(localAddress, localPort, certData.certificate(), certData.privateKey()))
150             .when(serverConfig).getTransport();
151         doReturn(clientTransportTls(localAddress, localPort, certData.certificate())).when(clientConfig).getTransport();
152         integrationTest(http2);
153     }
154
155     @ParameterizedTest(name = "TLS with Basic authorization, HTTP/2: {0}")
156     @ValueSource(booleans = {false, true})
157     void basicAuthTls(final boolean http2) throws Exception {
158         final var certData = generateX509CertData("RSA");
159         final var localPort = freePort();
160         doReturn(serverTransportTls(localAddress, localPort, certData.certificate(), certData.privateKey(),
161             USER_HASHES_MAP)).when(serverConfig).getTransport();
162         doReturn(clientTransportTls(localAddress, localPort, certData.certificate(), USERNAME, PASSWORD))
163             .when(clientConfig).getTransport();
164         integrationTest(http2);
165     }
166
167     private void integrationTest(final boolean http2) throws Exception {
168         final var server = HTTPServer.listen(serverTransportListener, bootstrapFactory.newServerBootstrap(),
169             serverConfig, requestDispatcher).get(2, TimeUnit.SECONDS);
170         try {
171             final var client = HTTPClient.connect(clientTransportListener, bootstrapFactory.newBootstrap(),
172                 clientConfig, http2).get(2, TimeUnit.SECONDS);
173             try {
174                 await().atMost(Duration.ofSeconds(2)).until(() -> serverTransportListener.initialized);
175                 await().atMost(Duration.ofSeconds(2)).until(() -> clientTransportListener.initialized);
176                 assertNotNull(clientEventStreamService);
177
178                 // verify HTTP request/response works over current connection
179                 assertGetRequest(client);
180
181                 // request SSE with invalid URI
182                 clientEventStreamService.startEventStream(DATA_URI, eventStreamListener, startCallback);
183                 verify(startCallback, timeout(1000)).onStartFailure(exceptionCaptor.capture());
184                 final var exception = exceptionCaptor.getValue();
185                 assertNotNull(exception);
186                 assertEquals(DECLINE_MESSAGE, exception.getMessage());
187
188                 // start SSE stream with proper URI
189                 clientEventStreamService.startEventStream(STREAM_URI, eventStreamListener, startCallback);
190                 verify(startCallback, timeout(1000)).onStreamStarted(any());
191                 verify(eventStreamListener).onStreamStart();
192
193                 // send series of event fields (name:value pairs)
194                 assertNotNull(serverEventStreamService.listener);
195                 for (var value : DATA_VALUES) {
196                     serverEventStreamService.listener.onEventField(DATA, value);
197                     verify(eventStreamListener, timeout(1000)).onEventField(DATA, value);
198                 }
199
200                 // end stream while keeping connection alive
201                 serverEventStreamService.listener.onStreamEnd();
202                 verify(eventStreamListener, timeout(1000)).onStreamEnd();
203
204                 // verify HTTP request/response works on same connection
205                 assertGetRequest(client);
206
207             } finally {
208                 client.shutdown().get(2, TimeUnit.SECONDS);
209             }
210         } finally {
211             server.shutdown().get(2, TimeUnit.SECONDS);
212         }
213     }
214
215     private static void assertGetRequest(final HTTPClient client) throws Exception {
216         final var request = new DefaultFullHttpRequest(HTTP_1_1, GET, DATA_URI);
217         request.headers().set(CONNECTION, KEEP_ALIVE);
218         final var response = invoke(client, request).get(2, TimeUnit.SECONDS);
219         assertNotNull(response);
220         assertEquals(OK, response.status());
221     }
222
223     private static final class TestStreamService implements EventStreamService {
224         private EventStreamListener listener;
225
226         @Override
227         public void startEventStream(final String requestUri, final EventStreamListener eventListener,
228                 final StartCallback callback) {
229             if (STREAM_URI.equals(requestUri)) {
230                 // accept stream request
231                 listener = eventListener;
232                 callback.onStreamStarted(() -> {
233                     // no-op
234                 });
235             } else {
236                 // decline stream request
237                 callback.onStartFailure(new IllegalStateException(DECLINE_MESSAGE));
238             }
239         }
240     }
241
242     private static class TestTransportListener implements TransportChannelListener {
243         private final Consumer<Channel> initializer;
244         private volatile boolean initialized;
245
246         TestTransportListener(final Consumer<Channel> initializer) {
247             this.initializer = initializer;
248         }
249
250         @Override
251         public void onTransportChannelEstablished(final TransportChannel channel) {
252             initialized = true;
253             initializer.accept(channel.channel());
254         }
255
256         @Override
257         public void onTransportChannelFailed(final Throwable cause) {
258             throw new IllegalStateException("HTTP connection failure", cause);
259         }
260     }
261 }