2 * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams.sse;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyBoolean;
14 import static org.mockito.ArgumentMatchers.eq;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
21 import java.io.IOException;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import org.glassfish.jersey.media.sse.EventOutput;
26 import org.glassfish.jersey.media.sse.OutboundEvent;
27 import org.junit.Test;
28 import org.junit.runner.RunWith;
29 import org.mockito.ArgumentCaptor;
30 import org.mockito.Mock;
31 import org.mockito.junit.MockitoJUnitRunner;
32 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
34 @RunWith(MockitoJUnitRunner.StrictStubs.class)
35 public class SSESessionHandlerTest {
37 private ScheduledExecutorService executorService;
39 private BaseListenerInterface listener;
41 private ScheduledFuture<?> pingFuture;
43 private EventOutput eventOutput;
45 private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) {
46 final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventOutput, listener,
47 maxFragmentSize, heartbeatInterval);
48 doReturn(pingFuture).when(executorService)
49 .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval),
50 eq(TimeUnit.MILLISECONDS));
51 return sseSessionHandler;
55 public void onSSEConnectedWithEnabledPing() {
56 final int heartbeatInterval = 1000;
57 final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
59 sseSessionHandler.init();
60 verify(listener).addSubscriber(sseSessionHandler);
61 verify(executorService).scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval),
62 eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS));
66 public void onSSEConnectedWithDisabledPing() {
67 final int heartbeatInterval = 0;
68 final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
70 sseSessionHandler.init();
71 verify(listener).addSubscriber(sseSessionHandler);
72 verifyNoMoreInteractions(executorService);
76 public void onSSEClosedWithOpenSession() {
77 final SSESessionHandler sseSessionHandler = setup(200, 10000);
79 sseSessionHandler.init();
80 verify(listener).addSubscriber(sseSessionHandler);
82 sseSessionHandler.close();
83 verify(listener).removeSubscriber(sseSessionHandler);
87 public void onSSECloseWithEnabledPingAndLivingSession() throws IOException {
88 final SSESessionHandler sseSessionHandler = setup(150, 8000);
89 sseSessionHandler.init();
90 doReturn(false).when(pingFuture).isCancelled();
91 doReturn(false).when(pingFuture).isDone();
93 sseSessionHandler.close();
94 verify(listener).removeSubscriber(sseSessionHandler);
95 verify(pingFuture).cancel(anyBoolean());
99 public void onSSECloseWithEnabledPingAndDeadSession() {
100 final SSESessionHandler sseSessionHandler = setup(150, 8000);
101 sseSessionHandler.init();
103 sseSessionHandler.close();
104 verify(listener).removeSubscriber(sseSessionHandler);
105 verify(pingFuture).cancel(anyBoolean());
109 public void onSSECloseWithDisabledPingAndDeadSession() {
110 final SSESessionHandler sseSessionHandler = setup(150, 8000);
111 sseSessionHandler.init();
112 doReturn(true).when(pingFuture).isDone();
114 sseSessionHandler.close();
115 verify(listener).removeSubscriber(sseSessionHandler);
116 verify(pingFuture, never()).cancel(anyBoolean());
120 public void sendDataMessageWithDisabledFragmentation() throws IOException {
121 final SSESessionHandler sseSessionHandler = setup(0, 0);
122 doReturn(false).when(eventOutput).isClosed();
123 sseSessionHandler.init();
124 final String testMessage = generateRandomStringOfLength(100);
125 sseSessionHandler.sendDataMessage(testMessage);
127 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
128 verify(eventOutput, times(1)).write(cap.capture());
129 OutboundEvent event = cap.getAllValues().get(0);
130 assertNotNull(event);
131 assertEquals(event.getData(), testMessage);
135 public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException {
136 final SSESessionHandler sseSessionHandler = setup(0, 0);
137 doReturn(true).when(eventOutput).isClosed();
138 sseSessionHandler.init();
140 final String testMessage = generateRandomStringOfLength(11);
141 sseSessionHandler.sendDataMessage(testMessage);
142 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
143 verify(eventOutput, times(0)).write(cap.capture());
147 public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
148 final SSESessionHandler sseSessionHandler = setup(100, 0);
149 doReturn(false).when(eventOutput).isClosed();
150 sseSessionHandler.init();
152 // in both cases, fragmentation should not be applied
153 final String testMessage1 = generateRandomStringOfLength(100);
154 final String testMessage2 = generateRandomStringOfLength(50);
155 sseSessionHandler.sendDataMessage(testMessage1);
156 sseSessionHandler.sendDataMessage(testMessage2);
158 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
159 // without fragmentation there will be 2 write calls
160 verify(eventOutput, times(2)).write(cap.capture());
161 OutboundEvent event1 = cap.getAllValues().get(0);
162 OutboundEvent event2 = cap.getAllValues().get(1);
163 assertNotNull(event1);
164 assertNotNull(event2);
165 assertEquals(event1.getData(), testMessage1);
166 assertEquals(event2.getData(), testMessage2);
167 String[] lines1 = ((String) event1.getData()).split("\r\n|\r|\n");
168 assertEquals(lines1.length, 1);
169 String[] lines2 = ((String) event2.getData()).split("\r\n|\r|\n");
170 assertEquals(lines2.length, 1);
174 public void sendDataMessageWithZeroLength() {
175 final SSESessionHandler sseSessionHandler = setup(100, 0);
176 sseSessionHandler.init();
178 sseSessionHandler.sendDataMessage("");
179 verifyNoMoreInteractions(eventOutput);
183 public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
184 final SSESessionHandler sseSessionHandler = setup(100, 0);
185 doReturn(false).when(eventOutput).isClosed();
186 sseSessionHandler.init();
188 // there should be 10 fragments of length 100 characters
189 final String testMessage = generateRandomStringOfLength(1000);
190 sseSessionHandler.sendDataMessage(testMessage);
191 ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
192 // SSE automatically send fragmented packet ended with new line character due to eventOutput
193 // have only 1 write call
194 verify(eventOutput, times(1)).write(cap.capture());
195 OutboundEvent event = cap.getAllValues().get(0);
196 assertNotNull(event);
197 String[] lines = ((String) event.getData()).split("\r\n|\r|\n");
198 assertEquals(lines.length, 10);
201 private static String generateRandomStringOfLength(final int length) {
202 final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
203 final StringBuilder sb = new StringBuilder(length);
204 for (int i = 0; i < length; i++) {
205 int index = (int) (alphabet.length() * Math.random());
206 sb.append(alphabet.charAt(index));
208 return sb.toString();