2 * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static org.junit.jupiter.api.Assertions.assertEquals;
11 import static org.junit.jupiter.api.Assertions.assertNotNull;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.eq;
14 import static org.mockito.Mockito.doAnswer;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
21 import java.util.concurrent.TimeUnit;
22 import javax.ws.rs.sse.Sse;
23 import javax.ws.rs.sse.SseEventSink;
24 import org.glassfish.jersey.media.sse.OutboundEvent;
25 import org.junit.jupiter.api.Test;
26 import org.junit.jupiter.api.extension.ExtendWith;
27 import org.mockito.ArgumentCaptor;
28 import org.mockito.Mock;
29 import org.mockito.junit.jupiter.MockitoExtension;
30 import org.opendaylight.restconf.server.api.EventStreamGetParams;
31 import org.opendaylight.restconf.server.spi.RestconfStream;
32 import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
33 import org.opendaylight.yangtools.concepts.Registration;
35 @ExtendWith(MockitoExtension.class)
36 class SSESessionHandlerTest {
38 private PingExecutor pingExecutor;
40 private RestconfStream<?> stream;
42 private Registration pingRegistration;
44 private SseEventSink eventSink;
48 private Registration reg;
50 private SSESender setup(final int maxFragmentSize, final long heartbeatInterval) throws Exception {
51 final var sseSessionHandler = new SSESender(pingExecutor, eventSink, sse, stream,
52 EncodingName.RFC8040_XML, new EventStreamGetParams(null, null, null, null, null, null, null),
53 maxFragmentSize, heartbeatInterval);
54 doReturn(reg).when(stream).addSubscriber(eq(sseSessionHandler), any(), any());
55 return sseSessionHandler;
58 private void setupEvent() {
59 doAnswer(inv -> new OutboundEvent.Builder().data(String.class, inv.getArgument(0, String.class)).build())
60 .when(sse).newEvent(any());
63 private void setupPing(final long maxFragmentSize, final long heartbeatInterval) {
64 doReturn(pingRegistration).when(pingExecutor)
65 .startPingProcess(any(Runnable.class), eq(heartbeatInterval), eq(TimeUnit.MILLISECONDS));
69 void onSSEConnectedWithEnabledPing() throws Exception {
70 final var heartbeatInterval = 1000L;
71 final var sseSessionHandler = setup(1000, heartbeatInterval);
73 sseSessionHandler.init();
74 verify(pingExecutor).startPingProcess(any(Runnable.class), eq(heartbeatInterval), eq(TimeUnit.MILLISECONDS));
78 void onSSEConnectedWithDisabledPing() throws Exception {
79 final int heartbeatInterval = 0;
80 final var sseSessionHandler = setup(1000, heartbeatInterval);
82 sseSessionHandler.init();
83 verifyNoMoreInteractions(pingExecutor);
87 void onSSEClosedWithOpenSession() throws Exception {
88 final var sseSessionHandler = setup(200, 10000);
90 sseSessionHandler.init();
92 sseSessionHandler.close();
97 void onSSECloseWithEnabledPingAndLivingSession() throws Exception {
98 final var sseSessionHandler = setup(150, 8000);
100 sseSessionHandler.init();
102 doNothing().when(pingRegistration).close();
103 sseSessionHandler.close();
108 void onSSECloseWithEnabledPingAndDeadSession() throws Exception {
109 final var sseSessionHandler = setup(150, 8000);
110 setupPing(150, 8000);
111 sseSessionHandler.init();
113 doNothing().when(pingRegistration).close();
114 sseSessionHandler.close();
119 void onSSECloseWithDisabledPingAndDeadSession() throws Exception {
120 final var sseSessionHandler = setup(150, 8000);
121 sseSessionHandler.init();
123 sseSessionHandler.close();
125 verifyNoMoreInteractions(pingRegistration);
129 void sendDataMessageWithDisabledFragmentation() throws Exception {
130 final var sseSessionHandler = setup(0, 0);
131 doReturn(false).when(eventSink).isClosed();
133 sseSessionHandler.init();
134 final String testMessage = generateRandomStringOfLength(100);
135 sseSessionHandler.sendDataMessage(testMessage);
137 final var cap = ArgumentCaptor.forClass(OutboundEvent.class);
138 verify(eventSink, times(1)).send(cap.capture());
139 final var event = cap.getAllValues().get(0);
140 assertNotNull(event);
141 assertEquals(event.getData(), testMessage);
145 void sendDataMessageWithDisabledFragAndDeadSession() throws Exception {
146 final var sseSessionHandler = setup(0, 0);
147 doReturn(true).when(eventSink).isClosed();
148 sseSessionHandler.init();
150 final String testMessage = generateRandomStringOfLength(11);
151 sseSessionHandler.sendDataMessage(testMessage);
152 verify(eventSink, times(0)).send(any());
156 void sendDataMessageWithEnabledFragAndSmallMessage() throws Exception {
157 final var sseSessionHandler = setup(100, 0);
158 doReturn(false).when(eventSink).isClosed();
160 sseSessionHandler.init();
162 // in both cases, fragmentation should not be applied
163 final String testMessage1 = generateRandomStringOfLength(100);
164 final String testMessage2 = generateRandomStringOfLength(50);
165 sseSessionHandler.sendDataMessage(testMessage1);
166 sseSessionHandler.sendDataMessage(testMessage2);
168 final var cap = ArgumentCaptor.forClass(OutboundEvent.class);
169 // without fragmentation there will be 2 write calls
170 verify(eventSink, times(2)).send(cap.capture());
171 OutboundEvent event1 = cap.getAllValues().get(0);
172 OutboundEvent event2 = cap.getAllValues().get(1);
173 assertNotNull(event1);
174 assertNotNull(event2);
175 assertEquals(event1.getData(), testMessage1);
176 assertEquals(event2.getData(), testMessage2);
177 String[] lines1 = ((String) event1.getData()).split("\r\n|\r|\n");
178 assertEquals(lines1.length, 1);
179 String[] lines2 = ((String) event2.getData()).split("\r\n|\r|\n");
180 assertEquals(lines2.length, 1);
184 void sendDataMessageWithZeroLength() throws Exception {
185 final var sseSessionHandler = setup(100, 0);
186 sseSessionHandler.init();
188 sseSessionHandler.sendDataMessage("");
189 verifyNoMoreInteractions(eventSink);
193 void sendDataMessageWithEnabledFragAndLargeMessage1() throws Exception {
194 final var sseSessionHandler = setup(100, 0);
195 doReturn(false).when(eventSink).isClosed();
197 sseSessionHandler.init();
199 // there should be 10 fragments of length 100 characters
200 final String testMessage = generateRandomStringOfLength(1000);
201 sseSessionHandler.sendDataMessage(testMessage);
202 final var cap = ArgumentCaptor.forClass(OutboundEvent.class);
203 // SSE automatically send fragmented packet ended with new line character due to eventOutput
204 // have only 1 write call
205 verify(eventSink, times(1)).send(cap.capture());
206 OutboundEvent event = cap.getAllValues().get(0);
207 assertNotNull(event);
208 String[] lines = ((String) event.getData()).split("\r\n|\r|\n");
209 assertEquals(lines.length, 10);
212 private static String generateRandomStringOfLength(final int length) {
213 final var alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
214 final var sb = new StringBuilder(length);
215 for (int i = 0; i < length; i++) {
216 int index = (int) (alphabet.length() * Math.random());
217 sb.append(alphabet.charAt(index));
219 return sb.toString();