2 * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyBoolean;
14 import static org.mockito.ArgumentMatchers.eq;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
22 import java.io.IOException;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import javax.ws.rs.sse.Sse;
27 import javax.ws.rs.sse.SseEventSink;
28 import org.glassfish.jersey.media.sse.OutboundEvent;
29 import org.junit.Test;
30 import org.junit.runner.RunWith;
31 import org.mockito.ArgumentCaptor;
32 import org.mockito.Mock;
33 import org.mockito.junit.MockitoJUnitRunner;
34 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
36 @RunWith(MockitoJUnitRunner.StrictStubs.class)
37 public class SSESessionHandlerTest {
39 private ScheduledExecutorService executorService;
41 private BaseListenerInterface listener;
43 private ScheduledFuture<?> pingFuture;
45 private SseEventSink eventSink;
49 private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) {
50 doAnswer(inv -> new OutboundEvent.Builder().data(String.class, inv.getArgument(0, String.class)).build())
51 .when(sse).newEvent(any());
53 final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener,
54 maxFragmentSize, heartbeatInterval);
55 doReturn(pingFuture).when(executorService)
56 .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval),
57 eq(TimeUnit.MILLISECONDS));
58 return sseSessionHandler;
62 public void onSSEConnectedWithEnabledPing() {
63 final int heartbeatInterval = 1000;
64 final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
66 sseSessionHandler.init();
67 verify(listener).addSubscriber(sseSessionHandler);
68 verify(executorService).scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval),
69 eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS));
73 public void onSSEConnectedWithDisabledPing() {
74 final int heartbeatInterval = 0;
75 final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
77 sseSessionHandler.init();
78 verify(listener).addSubscriber(sseSessionHandler);
79 verifyNoMoreInteractions(executorService);
83 public void onSSEClosedWithOpenSession() {
84 final SSESessionHandler sseSessionHandler = setup(200, 10000);
86 sseSessionHandler.init();
87 verify(listener).addSubscriber(sseSessionHandler);
89 sseSessionHandler.close();
90 verify(listener).removeSubscriber(sseSessionHandler);
94 public void onSSECloseWithEnabledPingAndLivingSession() throws IOException {
95 final SSESessionHandler sseSessionHandler = setup(150, 8000);
96 sseSessionHandler.init();
97 doReturn(false).when(pingFuture).isCancelled();
98 doReturn(false).when(pingFuture).isDone();
100 sseSessionHandler.close();
101 verify(listener).removeSubscriber(sseSessionHandler);
102 verify(pingFuture).cancel(anyBoolean());
106 public void onSSECloseWithEnabledPingAndDeadSession() {
107 final SSESessionHandler sseSessionHandler = setup(150, 8000);
108 sseSessionHandler.init();
110 sseSessionHandler.close();
111 verify(listener).removeSubscriber(sseSessionHandler);
112 verify(pingFuture).cancel(anyBoolean());
116 public void onSSECloseWithDisabledPingAndDeadSession() {
117 final SSESessionHandler sseSessionHandler = setup(150, 8000);
118 sseSessionHandler.init();
119 doReturn(true).when(pingFuture).isDone();
121 sseSessionHandler.close();
122 verify(listener).removeSubscriber(sseSessionHandler);
123 verify(pingFuture, never()).cancel(anyBoolean());
127 public void sendDataMessageWithDisabledFragmentation() throws IOException {
128 final SSESessionHandler sseSessionHandler = setup(0, 0);
129 doReturn(false).when(eventSink).isClosed();
130 sseSessionHandler.init();
131 final String testMessage = generateRandomStringOfLength(100);
132 sseSessionHandler.sendDataMessage(testMessage);
134 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
135 verify(eventSink, times(1)).send(cap.capture());
136 OutboundEvent event = cap.getAllValues().get(0);
137 assertNotNull(event);
138 assertEquals(event.getData(), testMessage);
142 public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException {
143 final SSESessionHandler sseSessionHandler = setup(0, 0);
144 doReturn(true).when(eventSink).isClosed();
145 sseSessionHandler.init();
147 final String testMessage = generateRandomStringOfLength(11);
148 sseSessionHandler.sendDataMessage(testMessage);
149 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
150 verify(eventSink, times(0)).send(cap.capture());
154 public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
155 final SSESessionHandler sseSessionHandler = setup(100, 0);
156 doReturn(false).when(eventSink).isClosed();
157 sseSessionHandler.init();
159 // in both cases, fragmentation should not be applied
160 final String testMessage1 = generateRandomStringOfLength(100);
161 final String testMessage2 = generateRandomStringOfLength(50);
162 sseSessionHandler.sendDataMessage(testMessage1);
163 sseSessionHandler.sendDataMessage(testMessage2);
165 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
166 // without fragmentation there will be 2 write calls
167 verify(eventSink, times(2)).send(cap.capture());
168 OutboundEvent event1 = cap.getAllValues().get(0);
169 OutboundEvent event2 = cap.getAllValues().get(1);
170 assertNotNull(event1);
171 assertNotNull(event2);
172 assertEquals(event1.getData(), testMessage1);
173 assertEquals(event2.getData(), testMessage2);
174 String[] lines1 = ((String) event1.getData()).split("\r\n|\r|\n");
175 assertEquals(lines1.length, 1);
176 String[] lines2 = ((String) event2.getData()).split("\r\n|\r|\n");
177 assertEquals(lines2.length, 1);
181 public void sendDataMessageWithZeroLength() {
182 final SSESessionHandler sseSessionHandler = setup(100, 0);
183 sseSessionHandler.init();
185 sseSessionHandler.sendDataMessage("");
186 verifyNoMoreInteractions(eventSink);
190 public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
191 final SSESessionHandler sseSessionHandler = setup(100, 0);
192 doReturn(false).when(eventSink).isClosed();
193 sseSessionHandler.init();
195 // there should be 10 fragments of length 100 characters
196 final String testMessage = generateRandomStringOfLength(1000);
197 sseSessionHandler.sendDataMessage(testMessage);
198 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
199 // SSE automatically send fragmented packet ended with new line character due to eventOutput
200 // have only 1 write call
201 verify(eventSink, times(1)).send(cap.capture());
202 OutboundEvent event = cap.getAllValues().get(0);
203 assertNotNull(event);
204 String[] lines = ((String) event.getData()).split("\r\n|\r|\n");
205 assertEquals(lines.length, 10);
208 private static String generateRandomStringOfLength(final int length) {
209 final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
210 final StringBuilder sb = new StringBuilder(length);
211 for (int i = 0; i < length; i++) {
212 int index = (int) (alphabet.length() * Math.random());
213 sb.append(alphabet.charAt(index));
215 return sb.toString();