f470e1ba4cb67e1e040eb8aac5833fb74e4721f8
[netconf.git] / restconf / restconf-nb-rfc8040 / src / test / java / org / opendaylight / restconf / nb / rfc8040 / streams / sse / SSESessionHandlerTest.java
1 /*
2  * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams.sse;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.ArgumentMatchers.anyBoolean;
14 import static org.mockito.ArgumentMatchers.eq;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20
21 import java.io.IOException;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import org.glassfish.jersey.media.sse.EventOutput;
26 import org.glassfish.jersey.media.sse.OutboundEvent;
27 import org.junit.Test;
28 import org.junit.runner.RunWith;
29 import org.mockito.ArgumentCaptor;
30 import org.mockito.Mock;
31 import org.mockito.junit.MockitoJUnitRunner;
32 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
33
34 @RunWith(MockitoJUnitRunner.StrictStubs.class)
35 public class SSESessionHandlerTest {
36     @Mock
37     private ScheduledExecutorService executorService;
38     @Mock
39     private BaseListenerInterface listener;
40     @Mock
41     private ScheduledFuture<?> pingFuture;
42     @Mock
43     private EventOutput eventOutput;
44
45     private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) {
46         final SSESessionHandler sseSessionHandler = new SSESessionHandler(executorService, eventOutput, listener,
47             maxFragmentSize, heartbeatInterval);
48         doReturn(pingFuture).when(executorService)
49             .scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), eq((long) heartbeatInterval),
50                 eq(TimeUnit.MILLISECONDS));
51         return sseSessionHandler;
52     }
53
54     @Test
55     public void onSSEConnectedWithEnabledPing() {
56         final int heartbeatInterval = 1000;
57         final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
58
59         sseSessionHandler.init();
60         verify(listener).addSubscriber(sseSessionHandler);
61         verify(executorService).scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval),
62                 eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS));
63     }
64
65     @Test
66     public void onSSEConnectedWithDisabledPing() {
67         final int heartbeatInterval = 0;
68         final SSESessionHandler sseSessionHandler = setup(1000, heartbeatInterval);
69
70         sseSessionHandler.init();
71         verify(listener).addSubscriber(sseSessionHandler);
72         verifyNoMoreInteractions(executorService);
73     }
74
75     @Test
76     public void onSSEClosedWithOpenSession() {
77         final SSESessionHandler sseSessionHandler = setup(200, 10000);
78
79         sseSessionHandler.init();
80         verify(listener).addSubscriber(sseSessionHandler);
81
82         sseSessionHandler.close();
83         verify(listener).removeSubscriber(sseSessionHandler);
84     }
85
86     @Test
87     public void onSSECloseWithEnabledPingAndLivingSession() throws IOException {
88         final SSESessionHandler sseSessionHandler = setup(150, 8000);
89         sseSessionHandler.init();
90         doReturn(false).when(pingFuture).isCancelled();
91         doReturn(false).when(pingFuture).isDone();
92
93         sseSessionHandler.close();
94         verify(listener).removeSubscriber(sseSessionHandler);
95         verify(pingFuture).cancel(anyBoolean());
96     }
97
98     @Test
99     public void onSSECloseWithEnabledPingAndDeadSession() {
100         final SSESessionHandler sseSessionHandler = setup(150, 8000);
101         sseSessionHandler.init();
102
103         sseSessionHandler.close();
104         verify(listener).removeSubscriber(sseSessionHandler);
105         verify(pingFuture).cancel(anyBoolean());
106     }
107
108     @Test
109     public void onSSECloseWithDisabledPingAndDeadSession() {
110         final SSESessionHandler sseSessionHandler = setup(150, 8000);
111         sseSessionHandler.init();
112         doReturn(true).when(pingFuture).isDone();
113
114         sseSessionHandler.close();
115         verify(listener).removeSubscriber(sseSessionHandler);
116         verify(pingFuture, never()).cancel(anyBoolean());
117     }
118
119     @Test
120     public void sendDataMessageWithDisabledFragmentation() throws IOException {
121         final SSESessionHandler sseSessionHandler = setup(0, 0);
122         doReturn(false).when(eventOutput).isClosed();
123         sseSessionHandler.init();
124         final String testMessage = generateRandomStringOfLength(100);
125         sseSessionHandler.sendDataMessage(testMessage);
126
127         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
128         verify(eventOutput, times(1)).write(cap.capture());
129         OutboundEvent event = cap.getAllValues().get(0);
130         assertNotNull(event);
131         assertEquals(event.getData(), testMessage);
132     }
133
134     @Test
135     public void sendDataMessageWithDisabledFragAndDeadSession() throws IOException {
136         final SSESessionHandler sseSessionHandler = setup(0, 0);
137         doReturn(true).when(eventOutput).isClosed();
138         sseSessionHandler.init();
139
140         final String testMessage = generateRandomStringOfLength(11);
141         sseSessionHandler.sendDataMessage(testMessage);
142         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
143         verify(eventOutput, times(0)).write(cap.capture());
144     }
145
146     @Test
147     public void sendDataMessageWithEnabledFragAndSmallMessage() throws IOException {
148         final SSESessionHandler sseSessionHandler = setup(100, 0);
149         doReturn(false).when(eventOutput).isClosed();
150         sseSessionHandler.init();
151
152         // in both cases, fragmentation should not be applied
153         final String testMessage1 = generateRandomStringOfLength(100);
154         final String testMessage2 = generateRandomStringOfLength(50);
155         sseSessionHandler.sendDataMessage(testMessage1);
156         sseSessionHandler.sendDataMessage(testMessage2);
157
158         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
159         // without fragmentation there will be 2 write calls
160         verify(eventOutput, times(2)).write(cap.capture());
161         OutboundEvent event1 = cap.getAllValues().get(0);
162         OutboundEvent event2 = cap.getAllValues().get(1);
163         assertNotNull(event1);
164         assertNotNull(event2);
165         assertEquals(event1.getData(), testMessage1);
166         assertEquals(event2.getData(), testMessage2);
167         String[] lines1 = ((String) event1.getData()).split("\r\n|\r|\n");
168         assertEquals(lines1.length, 1);
169         String[] lines2 = ((String) event2.getData()).split("\r\n|\r|\n");
170         assertEquals(lines2.length, 1);
171     }
172
173     @Test
174     public void sendDataMessageWithZeroLength() {
175         final SSESessionHandler sseSessionHandler = setup(100, 0);
176         sseSessionHandler.init();
177
178         sseSessionHandler.sendDataMessage("");
179         verifyNoMoreInteractions(eventOutput);
180     }
181
182     @Test
183     public void sendDataMessageWithEnabledFragAndLargeMessage1() throws IOException {
184         final SSESessionHandler sseSessionHandler = setup(100, 0);
185         doReturn(false).when(eventOutput).isClosed();
186         sseSessionHandler.init();
187
188         // there should be 10 fragments of length 100 characters
189         final String testMessage = generateRandomStringOfLength(1000);
190         sseSessionHandler.sendDataMessage(testMessage);
191         ArgumentCaptor<OutboundEvent> cap = ArgumentCaptor.forClass(OutboundEvent.class);
192         // SSE automatically send fragmented packet ended with new line character due to eventOutput
193         // have only 1 write call
194         verify(eventOutput, times(1)).write(cap.capture());
195         OutboundEvent event = cap.getAllValues().get(0);
196         assertNotNull(event);
197         String[] lines = ((String) event.getData()).split("\r\n|\r|\n");
198         assertEquals(lines.length, 10);
199     }
200
201     private static String generateRandomStringOfLength(final int length) {
202         final String alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvxyz";
203         final StringBuilder sb = new StringBuilder(length);
204         for (int i = 0; i < length; i++) {
205             int index = (int) (alphabet.length() * Math.random());
206             sb.append(alphabet.charAt(index));
207         }
208         return sb.toString();
209     }
210 }