2 * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyBoolean;
14 import static org.mockito.ArgumentMatchers.eq;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
22 import java.io.IOException;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import javax.ws.rs.sse.Sse;
27 import javax.ws.rs.sse.SseEventSink;
28 import org.glassfish.jersey.media.sse.OutboundEvent;
29 import org.junit.Test;
30 import org.junit.runner.RunWith;
31 import org.mockito.ArgumentCaptor;
32 import org.mockito.Mock;
33 import org.mockito.junit.MockitoJUnitRunner;
35 @RunWith(MockitoJUnitRunner.StrictStubs.class)
36 public class SSESessionHandlerTest {
38 private ScheduledExecutorService executorService;
40 private BaseListenerInterface listener;
42 private ScheduledFuture<?> pingFuture;
44 private SseEventSink eventSink;
48 private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) {
49 doAnswer(inv -> new OutboundEvent.Builder().data(String.class, inv.getArgument(0, String.class)).build())
50 .when(sse).newEvent(any());
52 final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener,
53 maxFragmentSize, heartbeatInterval);
54 doReturn(pingFuture).when(executorService)
55 .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval),
56 eq(TimeUnit.MILLISECONDS));
57 return sseSessionHandler;
61 public void onSSEConnectedWithEnabledPing() {
62 final int heartbeatInterval = 1000;
63 final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
65 sseSessionHandler.init();
66 verify(listener).addSubscriber(sseSessionHandler);
67 verify(executorService).scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval),
68 eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS));
72 public void onSSEConnectedWithDisabledPing() {
73 final int heartbeatInterval = 0;
74 final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
76 sseSessionHandler.init();
77 verify(listener).addSubscriber(sseSessionHandler);
78 verifyNoMoreInteractions(executorService);
82 public void onSSEClosedWithOpenSession() {
83 final SSESessionHandler sseSessionHandler = setup(200, 10000);
85 sseSessionHandler.init();
86 verify(listener).addSubscriber(sseSessionHandler);
88 sseSessionHandler.close();
89 verify(listener).removeSubscriber(sseSessionHandler);
93 public void onSSECloseWithEnabledPingAndLivingSession() throws IOException {
94 final SSESessionHandler sseSessionHandler = setup(150, 8000);
95 sseSessionHandler.init();
96 doReturn(false).when(pingFuture).isCancelled();
97 doReturn(false).when(pingFuture).isDone();
99 sseSessionHandler.close();
100 verify(listener).removeSubscriber(sseSessionHandler);
101 verify(pingFuture).cancel(anyBoolean());
105 public void onSSECloseWithEnabledPingAndDeadSession() {
106 final SSESessionHandler sseSessionHandler = setup(150, 8000);
107 sseSessionHandler.init();
109 sseSessionHandler.close();
110 verify(listener).removeSubscriber(sseSessionHandler);
111 verify(pingFuture).cancel(anyBoolean());
115 public void onSSECloseWithDisabledPingAndDeadSession() {
116 final SSESessionHandler sseSessionHandler = setup(150, 8000);
117 sseSessionHandler.init();
118 doReturn(true).when(pingFuture).isDone();
120 sseSessionHandler.close();
121 verify(listener).removeSubscriber(sseSessionHandler);
122 verify(pingFuture, never()).cancel(anyBoolean());
126 public void sendDataMessageWithDisabledFragmentation() throws IOException {
127 final SSESessionHandler sseSessionHandler = setup(0, 0);
128 doReturn(false).when(eventSink).isClosed();
129 sseSessionHandler.init();
130 final String testMessage = generateRandomStringOfLength(100);
131 sseSessionHandler.sendDataMessage(testMessage);
133 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
134 verify(eventSink, times(1)).send(cap.capture());
135 OutboundEvent event = cap.getAllValues().get(0);
136 assertNotNull(event);
137 assertEquals(event.getData(), testMessage);
141 public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException {
142 final SSESessionHandler sseSessionHandler = setup(0, 0);
143 doReturn(true).when(eventSink).isClosed();
144 sseSessionHandler.init();
146 final String testMessage = generateRandomStringOfLength(11);
147 sseSessionHandler.sendDataMessage(testMessage);
148 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
149 verify(eventSink, times(0)).send(cap.capture());
153 public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
154 final SSESessionHandler sseSessionHandler = setup(100, 0);
155 doReturn(false).when(eventSink).isClosed();
156 sseSessionHandler.init();
158 // in both cases, fragmentation should not be applied
159 final String testMessage1 = generateRandomStringOfLength(100);
160 final String testMessage2 = generateRandomStringOfLength(50);
161 sseSessionHandler.sendDataMessage(testMessage1);
162 sseSessionHandler.sendDataMessage(testMessage2);
164 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
165 // without fragmentation there will be 2 write calls
166 verify(eventSink, times(2)).send(cap.capture());
167 OutboundEvent event1 = cap.getAllValues().get(0);
168 OutboundEvent event2 = cap.getAllValues().get(1);
169 assertNotNull(event1);
170 assertNotNull(event2);
171 assertEquals(event1.getData(), testMessage1);
172 assertEquals(event2.getData(), testMessage2);
173 String[] lines1 = ((String) event1.getData()).split("\r\n|\r|\n");
174 assertEquals(lines1.length, 1);
175 String[] lines2 = ((String) event2.getData()).split("\r\n|\r|\n");
176 assertEquals(lines2.length, 1);
180 public void sendDataMessageWithZeroLength() {
181 final SSESessionHandler sseSessionHandler = setup(100, 0);
182 sseSessionHandler.init();
184 sseSessionHandler.sendDataMessage("");
185 verifyNoMoreInteractions(eventSink);
189 public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
190 final SSESessionHandler sseSessionHandler = setup(100, 0);
191 doReturn(false).when(eventSink).isClosed();
192 sseSessionHandler.init();
194 // there should be 10 fragments of length 100 characters
195 final String testMessage = generateRandomStringOfLength(1000);
196 sseSessionHandler.sendDataMessage(testMessage);
197 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
198 // SSE automatically send fragmented packet ended with new line character due to eventOutput
199 // have only 1 write call
200 verify(eventSink, times(1)).send(cap.capture());
201 OutboundEvent event = cap.getAllValues().get(0);
202 assertNotNull(event);
203 String[] lines = ((String) event.getData()).split("\r\n|\r|\n");
204 assertEquals(lines.length, 10);
207 private static String generateRandomStringOfLength(final int length) {
208 final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
209 final StringBuilder sb = new StringBuilder(length);
210 for (int i = 0; i < length; i++) {
211 int index = (int) (alphabet.length() * Math.random());
212 sb.append(alphabet.charAt(index));
214 return sb.toString();