Improve subscriber tracking
[netconf.git] / restconf / restconf-nb / src / test / java / org / opendaylight / restconf / nb / rfc8040 / streams / WebSocketSessionHandlerTest.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertTrue;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyBoolean;
14 import static org.mockito.ArgumentMatchers.anyString;
15 import static org.mockito.ArgumentMatchers.eq;
16 import static org.mockito.Mockito.doNothing;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.mock;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22 import static org.mockito.Mockito.verifyNoMoreInteractions;
23 import static org.mockito.Mockito.when;
24
25 import java.io.IOException;
26 import java.util.List;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
31 import org.eclipse.jetty.websocket.api.Session;
32 import org.junit.Test;
33 import org.mockito.ArgumentCaptor;
34 import org.opendaylight.yangtools.concepts.Registration;
35
36 public class WebSocketSessionHandlerTest {
37     private static final class WebSocketTestSessionState {
38         private final RestconfStream<?> listener;
39         private final ScheduledExecutorService executorService;
40         private final WebSocketSessionHandler webSocketSessionHandler;
41         private final int heartbeatInterval;
42         private final int maxFragmentSize;
43         private final ScheduledFuture pingFuture;
44
45         WebSocketTestSessionState(final int maxFragmentSize, final int heartbeatInterval) {
46             listener = mock(RestconfStream.class);
47             executorService = mock(ScheduledExecutorService.class);
48             this.heartbeatInterval = heartbeatInterval;
49             this.maxFragmentSize = maxFragmentSize;
50             webSocketSessionHandler = new WebSocketSessionHandler(executorService, listener, maxFragmentSize,
51                     heartbeatInterval);
52             pingFuture = mock(ScheduledFuture.class);
53             when(executorService.scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval),
54                 eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS))).thenReturn(pingFuture);
55         }
56     }
57
58     @Test
59     public void onWebSocketConnectedWithEnabledPing() {
60         final int heartbeatInterval = 1000;
61         final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(
62                 1000, heartbeatInterval);
63         final Session session = mock(Session.class);
64
65         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
66         verify(webSocketTestSessionState.listener).addSubscriber(
67                 webSocketTestSessionState.webSocketSessionHandler);
68         verify(webSocketTestSessionState.executorService).scheduleWithFixedDelay(any(Runnable.class),
69                 eq((long) webSocketTestSessionState.heartbeatInterval),
70                 eq((long) webSocketTestSessionState.heartbeatInterval), eq(TimeUnit.MILLISECONDS));
71     }
72
73     @Test
74     public void onWebSocketConnectedWithDisabledPing() {
75         final int heartbeatInterval = 0;
76         final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(
77                 1000, heartbeatInterval);
78         final Session session = mock(Session.class);
79
80         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
81         verify(webSocketTestSessionState.listener).addSubscriber(
82                 webSocketTestSessionState.webSocketSessionHandler);
83         verifyNoMoreInteractions(webSocketTestSessionState.executorService);
84     }
85
86     @Test
87     public void onWebSocketConnectedWithAlreadyOpenSession() {
88         final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
89         final var session = mock(Session.class);
90         when(session.isOpen()).thenReturn(true);
91
92         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
93         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
94         verify(webSocketTestSessionState.listener).addSubscriber(any());
95     }
96
97     @Test
98     public void onWebSocketClosedWithOpenSession() {
99         final var webSocketTestSessionState = new WebSocketTestSessionState(200, 10000);
100         final var session = mock(Session.class);
101         final var reg = mock(Registration.class);
102
103         doReturn(reg).when(webSocketTestSessionState.listener)
104             .addSubscriber(webSocketTestSessionState.webSocketSessionHandler);
105         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
106         verify(webSocketTestSessionState.listener).addSubscriber(webSocketTestSessionState.webSocketSessionHandler);
107
108         webSocketTestSessionState.webSocketSessionHandler.onWebSocketClosed(200, "Simulated close");
109         verify(reg).close();
110     }
111
112     @Test
113     public void onWebSocketClosedWithNotInitialisedSession() {
114         final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(300, 12000);
115         webSocketTestSessionState.webSocketSessionHandler.onWebSocketClosed(500, "Simulated close");
116         verifyNoMoreInteractions(webSocketTestSessionState.listener);
117     }
118
119     @Test
120     public void onWebSocketErrorWithEnabledPingAndLivingSession() {
121         final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
122         final var session = mock(Session.class);
123         final var reg = mock(Registration.class);
124
125         when(session.isOpen()).thenReturn(true);
126         when(webSocketTestSessionState.listener.addSubscriber(webSocketTestSessionState.webSocketSessionHandler))
127             .thenReturn(reg);
128         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
129         when(webSocketTestSessionState.pingFuture.isCancelled()).thenReturn(false);
130         when(webSocketTestSessionState.pingFuture.isDone()).thenReturn(false);
131
132         final var sampleError = new IllegalStateException("Simulated error");
133         doNothing().when(reg).close();
134         webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError);
135         verify(reg).close();
136         verify(session).close();
137         verify(webSocketTestSessionState.pingFuture).cancel(anyBoolean());
138     }
139
140     @Test
141     public void onWebSocketErrorWithEnabledPingAndDeadSession() {
142         final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
143         final var session = mock(Session.class);
144         final var reg = mock(Registration.class);
145
146         when(session.isOpen()).thenReturn(false);
147         when(webSocketTestSessionState.listener.addSubscriber(webSocketTestSessionState.webSocketSessionHandler))
148             .thenReturn(reg);
149         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
150
151         final var sampleError = new IllegalStateException("Simulated error");
152         webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError);
153         verify(reg).close();
154         verify(session, never()).close();
155         verify(webSocketTestSessionState.pingFuture).cancel(anyBoolean());
156     }
157
158     @Test
159     public void onWebSocketErrorWithDisabledPingAndDeadSession() {
160         final var webSocketTestSessionState = new WebSocketTestSessionState(150, 8000);
161         final var session = mock(Session.class);
162         final var reg = mock(Registration.class);
163
164         when(session.isOpen()).thenReturn(false);
165         when(webSocketTestSessionState.listener.addSubscriber(webSocketTestSessionState.webSocketSessionHandler))
166             .thenReturn(reg);
167         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
168         when(webSocketTestSessionState.pingFuture.isCancelled()).thenReturn(false);
169         when(webSocketTestSessionState.pingFuture.isDone()).thenReturn(true);
170
171         final var sampleError = new IllegalStateException("Simulated error");
172         webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError);
173         verify(reg).close();
174         verify(session, never()).close();
175         verify(webSocketTestSessionState.pingFuture, never()).cancel(anyBoolean());
176     }
177
178     @Test
179     public void sendDataMessageWithDisabledFragmentation() throws IOException {
180         final var webSocketTestSessionState = new WebSocketTestSessionState(0, 0);
181         final var session = mock(Session.class);
182         final var remoteEndpoint = mock(RemoteEndpoint.class);
183         when(session.isOpen()).thenReturn(true);
184         when(session.getRemote()).thenReturn(remoteEndpoint);
185         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
186
187         final String testMessage = generateRandomStringOfLength(100);
188         webSocketTestSessionState.webSocketSessionHandler.sendDataMessage(testMessage);
189         verify(remoteEndpoint).sendString(testMessage);
190     }
191
192     @Test
193     public void sendDataMessageWithDisabledFragAndDeadSession() {
194         final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(0, 0);
195         final Session session = mock(Session.class);
196         final RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
197         when(session.isOpen()).thenReturn(false);
198         when(session.getRemote()).thenReturn(remoteEndpoint);
199         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
200
201         final String testMessage = generateRandomStringOfLength(11);
202         webSocketTestSessionState.webSocketSessionHandler.sendDataMessage(testMessage);
203         verifyNoMoreInteractions(remoteEndpoint);
204     }
205
206     @Test
207     public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
208         final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
209         final Session session = mock(Session.class);
210         final RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
211         when(session.isOpen()).thenReturn(true);
212         when(session.getRemote()).thenReturn(remoteEndpoint);
213         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
214
215         // in both cases, fragmentation should not be applied
216         final String testMessage1 = generateRandomStringOfLength(100);
217         final String testMessage2 = generateRandomStringOfLength(50);
218         webSocketTestSessionState.webSocketSessionHandler.sendDataMessage(testMessage1);
219         webSocketTestSessionState.webSocketSessionHandler.sendDataMessage(testMessage2);
220         verify(remoteEndpoint).sendString(testMessage1);
221         verify(remoteEndpoint).sendString(testMessage2);
222         verify(remoteEndpoint, never()).sendPartialString(anyString(), anyBoolean());
223     }
224
225     @Test
226     public void sendDataMessageWithZeroLength() {
227         final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
228         final Session session = mock(Session.class);
229         final RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
230         when(session.isOpen()).thenReturn(true);
231         when(session.getRemote()).thenReturn(remoteEndpoint);
232         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
233
234         webSocketTestSessionState.webSocketSessionHandler.sendDataMessage("");
235         verifyNoMoreInteractions(remoteEndpoint);
236     }
237
238     @Test
239     public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
240         final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
241         final Session session = mock(Session.class);
242         final RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
243         when(session.isOpen()).thenReturn(true);
244         when(session.getRemote()).thenReturn(remoteEndpoint);
245         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
246
247         // there should be 10 fragments of length 100 characters
248         final String testMessage = generateRandomStringOfLength(1000);
249         webSocketTestSessionState.webSocketSessionHandler.sendDataMessage(testMessage);
250         final ArgumentCaptor<String> messageCaptor = ArgumentCaptor.forClass(String.class);
251         final ArgumentCaptor<Boolean> isLastCaptor = ArgumentCaptor.forClass(Boolean.class);
252         verify(remoteEndpoint, times(10)).sendPartialString(
253                 messageCaptor.capture(), isLastCaptor.capture());
254
255         final List<String> allMessages = messageCaptor.getAllValues();
256         final List<Boolean> isLastFlags = isLastCaptor.getAllValues();
257         assertTrue(allMessages.stream().allMatch(s -> s.length() == webSocketTestSessionState.maxFragmentSize));
258         assertTrue(isLastFlags.subList(0, 9).stream().noneMatch(isLast -> isLast));
259         assertTrue(isLastFlags.get(9));
260     }
261
262     @Test
263     public void sendDataMessageWithEnabledFragAndLargeMessage2() throws IOException {
264         final WebSocketTestSessionState webSocketTestSessionState = new WebSocketTestSessionState(100, 0);
265         final Session session = mock(Session.class);
266         final RemoteEndpoint remoteEndpoint = mock(RemoteEndpoint.class);
267         when(session.isOpen()).thenReturn(true);
268         when(session.getRemote()).thenReturn(remoteEndpoint);
269         webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session);
270
271         // there should be 10 fragments, the last fragment should be the shortest one
272         final String testMessage = generateRandomStringOfLength(950);
273         webSocketTestSessionState.webSocketSessionHandler.sendDataMessage(testMessage);
274         final ArgumentCaptor<String> messageCaptor = ArgumentCaptor.forClass(String.class);
275         final ArgumentCaptor<Boolean> isLastCaptor = ArgumentCaptor.forClass(Boolean.class);
276         verify(remoteEndpoint, times(10)).sendPartialString(
277                 messageCaptor.capture(), isLastCaptor.capture());
278
279         final List<String> allMessages = messageCaptor.getAllValues();
280         final List<Boolean> isLastFlags = isLastCaptor.getAllValues();
281         assertTrue(allMessages.subList(0, 9).stream().allMatch(s ->
282                 s.length() == webSocketTestSessionState.maxFragmentSize));
283         assertEquals(50, allMessages.get(9).length());
284         assertTrue(isLastFlags.subList(0, 9).stream().noneMatch(isLast -> isLast));
285         assertTrue(isLastFlags.get(9));
286     }
287
288     private static String generateRandomStringOfLength(final int length) {
289         final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
290         final StringBuilder sb = new StringBuilder(length);
291         for (int i = 0; i < length; i++) {
292             int index = (int) (alphabet.length() * Math.random());
293             sb.append(alphabet.charAt(index));
294         }
295         return sb.toString();
296     }
297 }