2 * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyBoolean;
14 import static org.mockito.ArgumentMatchers.eq;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doCallRealMethod;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
23 import java.io.IOException;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27 import javax.ws.rs.sse.Sse;
28 import javax.ws.rs.sse.SseEventSink;
29 import org.glassfish.jersey.media.sse.OutboundEvent;
30 import org.junit.Test;
31 import org.junit.runner.RunWith;
32 import org.mockito.ArgumentCaptor;
33 import org.mockito.Mock;
34 import org.mockito.junit.MockitoJUnitRunner;
35 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
37 @RunWith(MockitoJUnitRunner.StrictStubs.class)
38 public class SSESessionHandlerTest {
40 private ScheduledExecutorService executorService;
42 private BaseListenerInterface listener;
44 private ScheduledFuture<?> pingFuture;
46 private SseEventSink eventSink;
50 private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) {
51 doCallRealMethod().when(sse).newEvent(any());
52 doAnswer(inv -> new OutboundEvent.Builder()).when(sse).newEventBuilder();
54 final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener,
55 maxFragmentSize, heartbeatInterval);
56 doReturn(pingFuture).when(executorService)
57 .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval),
58 eq(TimeUnit.MILLISECONDS));
59 return sseSessionHandler;
63 public void onSSEConnectedWithEnabledPing() {
64 final int heartbeatInterval = 1000;
65 final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
67 sseSessionHandler.init();
68 verify(listener).addSubscriber(sseSessionHandler);
69 verify(executorService).scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval),
70 eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS));
74 public void onSSEConnectedWithDisabledPing() {
75 final int heartbeatInterval = 0;
76 final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
78 sseSessionHandler.init();
79 verify(listener).addSubscriber(sseSessionHandler);
80 verifyNoMoreInteractions(executorService);
84 public void onSSEClosedWithOpenSession() {
85 final SSESessionHandler sseSessionHandler = setup(200, 10000);
87 sseSessionHandler.init();
88 verify(listener).addSubscriber(sseSessionHandler);
90 sseSessionHandler.close();
91 verify(listener).removeSubscriber(sseSessionHandler);
95 public void onSSECloseWithEnabledPingAndLivingSession() throws IOException {
96 final SSESessionHandler sseSessionHandler = setup(150, 8000);
97 sseSessionHandler.init();
98 doReturn(false).when(pingFuture).isCancelled();
99 doReturn(false).when(pingFuture).isDone();
101 sseSessionHandler.close();
102 verify(listener).removeSubscriber(sseSessionHandler);
103 verify(pingFuture).cancel(anyBoolean());
107 public void onSSECloseWithEnabledPingAndDeadSession() {
108 final SSESessionHandler sseSessionHandler = setup(150, 8000);
109 sseSessionHandler.init();
111 sseSessionHandler.close();
112 verify(listener).removeSubscriber(sseSessionHandler);
113 verify(pingFuture).cancel(anyBoolean());
117 public void onSSECloseWithDisabledPingAndDeadSession() {
118 final SSESessionHandler sseSessionHandler = setup(150, 8000);
119 sseSessionHandler.init();
120 doReturn(true).when(pingFuture).isDone();
122 sseSessionHandler.close();
123 verify(listener).removeSubscriber(sseSessionHandler);
124 verify(pingFuture, never()).cancel(anyBoolean());
128 public void sendDataMessageWithDisabledFragmentation() throws IOException {
129 final SSESessionHandler sseSessionHandler = setup(0, 0);
130 doReturn(false).when(eventSink).isClosed();
131 sseSessionHandler.init();
132 final String testMessage = generateRandomStringOfLength(100);
133 sseSessionHandler.sendDataMessage(testMessage);
135 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
136 verify(eventSink, times(1)).send(cap.capture());
137 OutboundEvent event = cap.getAllValues().get(0);
138 assertNotNull(event);
139 assertEquals(event.getData(), testMessage);
143 public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException {
144 final SSESessionHandler sseSessionHandler = setup(0, 0);
145 doReturn(true).when(eventSink).isClosed();
146 sseSessionHandler.init();
148 final String testMessage = generateRandomStringOfLength(11);
149 sseSessionHandler.sendDataMessage(testMessage);
150 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
151 verify(eventSink, times(0)).send(cap.capture());
155 public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
156 final SSESessionHandler sseSessionHandler = setup(100, 0);
157 doReturn(false).when(eventSink).isClosed();
158 sseSessionHandler.init();
160 // in both cases, fragmentation should not be applied
161 final String testMessage1 = generateRandomStringOfLength(100);
162 final String testMessage2 = generateRandomStringOfLength(50);
163 sseSessionHandler.sendDataMessage(testMessage1);
164 sseSessionHandler.sendDataMessage(testMessage2);
166 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
167 // without fragmentation there will be 2 write calls
168 verify(eventSink, times(2)).send(cap.capture());
169 OutboundEvent event1 = cap.getAllValues().get(0);
170 OutboundEvent event2 = cap.getAllValues().get(1);
171 assertNotNull(event1);
172 assertNotNull(event2);
173 assertEquals(event1.getData(), testMessage1);
174 assertEquals(event2.getData(), testMessage2);
175 String[] lines1 = ((String) event1.getData()).split("\r\n|\r|\n");
176 assertEquals(lines1.length, 1);
177 String[] lines2 = ((String) event2.getData()).split("\r\n|\r|\n");
178 assertEquals(lines2.length, 1);
182 public void sendDataMessageWithZeroLength() {
183 final SSESessionHandler sseSessionHandler = setup(100, 0);
184 sseSessionHandler.init();
186 sseSessionHandler.sendDataMessage("");
187 verifyNoMoreInteractions(eventSink);
191 public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
192 final SSESessionHandler sseSessionHandler = setup(100, 0);
193 doReturn(false).when(eventSink).isClosed();
194 sseSessionHandler.init();
196 // there should be 10 fragments of length 100 characters
197 final String testMessage = generateRandomStringOfLength(1000);
198 sseSessionHandler.sendDataMessage(testMessage);
199 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
200 // SSE automatically send fragmented packet ended with new line character due to eventOutput
201 // have only 1 write call
202 verify(eventSink, times(1)).send(cap.capture());
203 OutboundEvent event = cap.getAllValues().get(0);
204 assertNotNull(event);
205 String[] lines = ((String) event.getData()).split("\r\n|\r|\n");
206 assertEquals(lines.length, 10);
209 private static String generateRandomStringOfLength(final int length) {
210 final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
211 final StringBuilder sb = new StringBuilder(length);
212 for (int i = 0; i < length; i++) {
213 int index = (int) (alphabet.length() * Math.random());
214 sb.append(alphabet.charAt(index));
216 return sb.toString();