6d32c4b24fa0335d860bea4b7be54bd38a550b7b
[netconf.git] / restconf / restconf-nb-rfc8040 / src / test / java / org / opendaylight / restconf / nb / rfc8040 / streams / SSESessionHandlerTest.java
1 /*
2  * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyBoolean;
14 import static org.mockito.ArgumentMatchers.eq;
15 import static org.mockito.Mockito.doAnswer;
16 import static org.mockito.Mockito.doCallRealMethod;
17 import static org.mockito.Mockito.doReturn;
18 import static org.mockito.Mockito.never;
19 import static org.mockito.Mockito.times;
20 import static org.mockito.Mockito.verify;
21 import static org.mockito.Mockito.verifyNoMoreInteractions;
22
23 import java.io.IOException;
24 import java.util.concurrent.ScheduledExecutorService;
25 import java.util.concurrent.ScheduledFuture;
26 import java.util.concurrent.TimeUnit;
27 import javax.ws.rs.sse.Sse;
28 import javax.ws.rs.sse.SseEventSink;
29 import org.glassfish.jersey.media.sse.OutboundEvent;
30 import org.junit.Test;
31 import org.junit.runner.RunWith;
32 import org.mockito.ArgumentCaptor;
33 import org.mockito.Mock;
34 import org.mockito.junit.MockitoJUnitRunner;
35 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
36
37 @RunWith(MockitoJUnitRunner.StrictStubs.class)
38 public class SSESessionHandlerTest {
39     @Mock
40     private ScheduledExecutorService executorService;
41     @Mock
42     private BaseListenerInterface listener;
43     @Mock
44     private ScheduledFuture<?> pingFuture;
45     @Mock
46     private SseEventSink eventSink;
47     @Mock
48     private Sse sse;
49
50     private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) {
51         doCallRealMethod().when(sse).newEvent(any());
52         doAnswer(inv -> new OutboundEvent.Builder()).when(sse).newEventBuilder();
53
54         final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, listener,
55             maxFragmentSize, heartbeatInterval);
56         doReturn(pingFuture).when(executorService)
57             .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval),
58                 eq(TimeUnit.MILLISECONDS));
59         return sseSessionHandler;
60     }
61
62     @Test
63     public void onSSEConnectedWithEnabledPing() {
64         final int heartbeatInterval = 1000;
65         final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
66
67         sseSessionHandler.init();
68         verify(listener).addSubscriber(sseSessionHandler);
69         verify(executorService).scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval),
70                 eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS));
71     }
72
73     @Test
74     public void onSSEConnectedWithDisabledPing() {
75         final int heartbeatInterval = 0;
76         final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
77
78         sseSessionHandler.init();
79         verify(listener).addSubscriber(sseSessionHandler);
80         verifyNoMoreInteractions(executorService);
81     }
82
83     @Test
84     public void onSSEClosedWithOpenSession() {
85         final SSESessionHandler sseSessionHandler = setup(200, 10000);
86
87         sseSessionHandler.init();
88         verify(listener).addSubscriber(sseSessionHandler);
89
90         sseSessionHandler.close();
91         verify(listener).removeSubscriber(sseSessionHandler);
92     }
93
94     @Test
95     public void onSSECloseWithEnabledPingAndLivingSession() throws IOException {
96         final SSESessionHandler sseSessionHandler = setup(150, 8000);
97         sseSessionHandler.init();
98         doReturn(false).when(pingFuture).isCancelled();
99         doReturn(false).when(pingFuture).isDone();
100
101         sseSessionHandler.close();
102         verify(listener).removeSubscriber(sseSessionHandler);
103         verify(pingFuture).cancel(anyBoolean());
104     }
105
106     @Test
107     public void onSSECloseWithEnabledPingAndDeadSession() {
108         final SSESessionHandler sseSessionHandler = setup(150, 8000);
109         sseSessionHandler.init();
110
111         sseSessionHandler.close();
112         verify(listener).removeSubscriber(sseSessionHandler);
113         verify(pingFuture).cancel(anyBoolean());
114     }
115
116     @Test
117     public void onSSECloseWithDisabledPingAndDeadSession() {
118         final SSESessionHandler sseSessionHandler = setup(150, 8000);
119         sseSessionHandler.init();
120         doReturn(true).when(pingFuture).isDone();
121
122         sseSessionHandler.close();
123         verify(listener).removeSubscriber(sseSessionHandler);
124         verify(pingFuture, never()).cancel(anyBoolean());
125     }
126
127     @Test
128     public void sendDataMessageWithDisabledFragmentation() throws IOException {
129         final SSESessionHandler sseSessionHandler = setup(0, 0);
130         doReturn(false).when(eventSink).isClosed();
131         sseSessionHandler.init();
132         final String testMessage = generateRandomStringOfLength(100);
133         sseSessionHandler.sendDataMessage(testMessage);
134
135         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
136         verify(eventSink, times(1)).send(cap.capture());
137         OutboundEvent event = cap.getAllValues().get(0);
138         assertNotNull(event);
139         assertEquals(event.getData(), testMessage);
140     }
141
142     @Test
143     public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException {
144         final SSESessionHandler sseSessionHandler = setup(0, 0);
145         doReturn(true).when(eventSink).isClosed();
146         sseSessionHandler.init();
147
148         final String testMessage = generateRandomStringOfLength(11);
149         sseSessionHandler.sendDataMessage(testMessage);
150         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
151         verify(eventSink, times(0)).send(cap.capture());
152     }
153
154     @Test
155     public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
156         final SSESessionHandler sseSessionHandler = setup(100, 0);
157         doReturn(false).when(eventSink).isClosed();
158         sseSessionHandler.init();
159
160         // in both cases, fragmentation should not be applied
161         final String testMessage1 = generateRandomStringOfLength(100);
162         final String testMessage2 = generateRandomStringOfLength(50);
163         sseSessionHandler.sendDataMessage(testMessage1);
164         sseSessionHandler.sendDataMessage(testMessage2);
165
166         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
167         // without fragmentation there will be 2 write calls
168         verify(eventSink, times(2)).send(cap.capture());
169         OutboundEvent event1 = cap.getAllValues().get(0);
170         OutboundEvent event2 = cap.getAllValues().get(1);
171         assertNotNull(event1);
172         assertNotNull(event2);
173         assertEquals(event1.getData(), testMessage1);
174         assertEquals(event2.getData(), testMessage2);
175         String[] lines1 = ((String) event1.getData()).split("\r\n|\r|\n");
176         assertEquals(lines1.length, 1);
177         String[] lines2 = ((String) event2.getData()).split("\r\n|\r|\n");
178         assertEquals(lines2.length, 1);
179     }
180
181     @Test
182     public void sendDataMessageWithZeroLength() {
183         final SSESessionHandler sseSessionHandler = setup(100, 0);
184         sseSessionHandler.init();
185
186         sseSessionHandler.sendDataMessage("");
187         verifyNoMoreInteractions(eventSink);
188     }
189
190     @Test
191     public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
192         final SSESessionHandler sseSessionHandler = setup(100, 0);
193         doReturn(false).when(eventSink).isClosed();
194         sseSessionHandler.init();
195
196         // there should be 10 fragments of length 100 characters
197         final String testMessage = generateRandomStringOfLength(1000);
198         sseSessionHandler.sendDataMessage(testMessage);
199         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
200         // SSE automatically send fragmented packet ended with new line character due to eventOutput
201         // have only 1 write call
202         verify(eventSink, times(1)).send(cap.capture());
203         OutboundEvent event = cap.getAllValues().get(0);
204         assertNotNull(event);
205         String[] lines = ((String) event.getData()).split("\r\n|\r|\n");
206         assertEquals(lines.length, 10);
207     }
208
209     private static String generateRandomStringOfLength(final int length) {
210         final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
211         final StringBuilder sb = new StringBuilder(length);
212         for (int i = 0; i < length; i++) {
213             int index = (int) (alphabet.length() * Math.random());
214             sb.append(alphabet.charAt(index));
215         }
216         return sb.toString();
217     }
218 }