316f1d781d08359b06c14711f9460b12b735192d
[netconf.git] / restconf / restconf-nb / src / test / java / org / opendaylight / restconf / nb / rfc8040 / streams / SSESessionHandlerTest.java
1 /*
2  * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyBoolean;
14 import static org.mockito.ArgumentMatchers.eq;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.times;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21
22 import java.io.IOException;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import javax.ws.rs.sse.Sse;
27 import javax.ws.rs.sse.SseEventSink;
28 import org.glassfish.jersey.media.sse.OutboundEvent;
29 import org.junit.Test;
30 import org.junit.runner.RunWith;
31 import org.mockito.ArgumentCaptor;
32 import org.mockito.Mock;
33 import org.mockito.junit.MockitoJUnitRunner;
34
35 @RunWith(MockitoJUnitRunner.StrictStubs.class)
36 public class SSESessionHandlerTest {
37     @Mock
38     private ScheduledExecutorService executorService;
39     @Mock
40     private BaseListenerInterface listener;
41     @Mock
42     private ScheduledFuture<?> pingFuture;
43     @Mock
44     private SseEventSink eventSink;
45     @Mock
46     private Sse sse;
47
48     private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) {
49         doAnswer(inv -> new OutboundEvent.Builder().data(String.class, inv.getArgument(0, String.class)).build())
50             .when(sse).newEvent(any());
51
52         final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener,
53             maxFragmentSize, heartbeatInterval);
54         doReturn(pingFuture).when(executorService)
55             .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval),
56                 eq(TimeUnit.MILLISECONDS));
57         return sseSessionHandler;
58     }
59
60     @Test
61     public void onSSEConnectedWithEnabledPing() {
62         final int heartbeatInterval = 1000;
63         final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
64
65         sseSessionHandler.init();
66         verify(listener).addSubscriber(sseSessionHandler);
67         verify(executorService).scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval),
68                 eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS));
69     }
70
71     @Test
72     public void onSSEConnectedWithDisabledPing() {
73         final int heartbeatInterval = 0;
74         final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
75
76         sseSessionHandler.init();
77         verify(listener).addSubscriber(sseSessionHandler);
78         verifyNoMoreInteractions(executorService);
79     }
80
81     @Test
82     public void onSSEClosedWithOpenSession() {
83         final SSESessionHandler sseSessionHandler = setup(200, 10000);
84
85         sseSessionHandler.init();
86         verify(listener).addSubscriber(sseSessionHandler);
87
88         sseSessionHandler.close();
89         verify(listener).removeSubscriber(sseSessionHandler);
90     }
91
92     @Test
93     public void onSSECloseWithEnabledPingAndLivingSession() throws IOException {
94         final SSESessionHandler sseSessionHandler = setup(150, 8000);
95         sseSessionHandler.init();
96         doReturn(false).when(pingFuture).isCancelled();
97         doReturn(false).when(pingFuture).isDone();
98
99         sseSessionHandler.close();
100         verify(listener).removeSubscriber(sseSessionHandler);
101         verify(pingFuture).cancel(anyBoolean());
102     }
103
104     @Test
105     public void onSSECloseWithEnabledPingAndDeadSession() {
106         final SSESessionHandler sseSessionHandler = setup(150, 8000);
107         sseSessionHandler.init();
108
109         sseSessionHandler.close();
110         verify(listener).removeSubscriber(sseSessionHandler);
111         verify(pingFuture).cancel(anyBoolean());
112     }
113
114     @Test
115     public void onSSECloseWithDisabledPingAndDeadSession() {
116         final SSESessionHandler sseSessionHandler = setup(150, 8000);
117         sseSessionHandler.init();
118         doReturn(true).when(pingFuture).isDone();
119
120         sseSessionHandler.close();
121         verify(listener).removeSubscriber(sseSessionHandler);
122         verify(pingFuture, never()).cancel(anyBoolean());
123     }
124
125     @Test
126     public void sendDataMessageWithDisabledFragmentation() throws IOException {
127         final SSESessionHandler sseSessionHandler = setup(0, 0);
128         doReturn(false).when(eventSink).isClosed();
129         sseSessionHandler.init();
130         final String testMessage = generateRandomStringOfLength(100);
131         sseSessionHandler.sendDataMessage(testMessage);
132
133         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
134         verify(eventSink, times(1)).send(cap.capture());
135         OutboundEvent event = cap.getAllValues().get(0);
136         assertNotNull(event);
137         assertEquals(event.getData(), testMessage);
138     }
139
140     @Test
141     public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException {
142         final SSESessionHandler sseSessionHandler = setup(0, 0);
143         doReturn(true).when(eventSink).isClosed();
144         sseSessionHandler.init();
145
146         final String testMessage = generateRandomStringOfLength(11);
147         sseSessionHandler.sendDataMessage(testMessage);
148         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
149         verify(eventSink, times(0)).send(cap.capture());
150     }
151
152     @Test
153     public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
154         final SSESessionHandler sseSessionHandler = setup(100, 0);
155         doReturn(false).when(eventSink).isClosed();
156         sseSessionHandler.init();
157
158         // in both cases, fragmentation should not be applied
159         final String testMessage1 = generateRandomStringOfLength(100);
160         final String testMessage2 = generateRandomStringOfLength(50);
161         sseSessionHandler.sendDataMessage(testMessage1);
162         sseSessionHandler.sendDataMessage(testMessage2);
163
164         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
165         // without fragmentation there will be 2 write calls
166         verify(eventSink, times(2)).send(cap.capture());
167         OutboundEvent event1 = cap.getAllValues().get(0);
168         OutboundEvent event2 = cap.getAllValues().get(1);
169         assertNotNull(event1);
170         assertNotNull(event2);
171         assertEquals(event1.getData(), testMessage1);
172         assertEquals(event2.getData(), testMessage2);
173         String[] lines1 = ((String) event1.getData()).split("\r\n|\r|\n");
174         assertEquals(lines1.length, 1);
175         String[] lines2 = ((String) event2.getData()).split("\r\n|\r|\n");
176         assertEquals(lines2.length, 1);
177     }
178
179     @Test
180     public void sendDataMessageWithZeroLength() {
181         final SSESessionHandler sseSessionHandler = setup(100, 0);
182         sseSessionHandler.init();
183
184         sseSessionHandler.sendDataMessage("");
185         verifyNoMoreInteractions(eventSink);
186     }
187
188     @Test
189     public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
190         final SSESessionHandler sseSessionHandler = setup(100, 0);
191         doReturn(false).when(eventSink).isClosed();
192         sseSessionHandler.init();
193
194         // there should be 10 fragments of length 100 characters
195         final String testMessage = generateRandomStringOfLength(1000);
196         sseSessionHandler.sendDataMessage(testMessage);
197         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
198         // SSE automatically send fragmented packet ended with new line character due to eventOutput
199         // have only 1 write call
200         verify(eventSink, times(1)).send(cap.capture());
201         OutboundEvent event = cap.getAllValues().get(0);
202         assertNotNull(event);
203         String[] lines = ((String) event.getData()).split("\r\n|\r|\n");
204         assertEquals(lines.length, 10);
205     }
206
207     private static String generateRandomStringOfLength(final int length) {
208         final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
209         final StringBuilder sb = new StringBuilder(length);
210         for (int i = 0; i < length; i++) {
211             int index = (int) (alphabet.length() * Math.random());
212             sb.append(alphabet.charAt(index));
213         }
214         return sb.toString();
215     }
216 }