Introduce restconf.server.{api,spi,mdsal}
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / WebSocketSender.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Strings;
13 import java.io.IOException;
14 import java.io.UnsupportedEncodingException;
15 import java.nio.ByteBuffer;
16 import java.nio.charset.Charset;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import javax.xml.xpath.XPathExpressionException;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.eclipse.jetty.websocket.api.CloseException;
25 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
26 import org.eclipse.jetty.websocket.api.Session;
27 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
28 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
29 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
30 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
31 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
32 import org.opendaylight.restconf.server.spi.RestconfStream;
33 import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
34 import org.opendaylight.restconf.server.spi.RestconfStream.Sender;
35 import org.opendaylight.yangtools.concepts.Registration;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * Web-socket session handler that is responsible for controlling of session, managing subscription
41  * to data-change-event or notification listener, and sending of data over established web-socket session.
42  */
43 @WebSocket
44 final class WebSocketSender implements Sender {
45     private static final Logger LOG = LoggerFactory.getLogger(WebSocketSender.class);
46     private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset());
47
48     private final PingExecutor pingExecutor;
49     private final RestconfStream<?> stream;
50     private final EncodingName encodingName;
51     private final ReceiveEventsParams params;
52     private final int maximumFragmentLength;
53     private final long heartbeatInterval;
54
55     private Session session;
56     private Registration subscriber;
57     private Registration pingProcess;
58
59     /**
60      * Creation of the new web-socket session handler.
61      *
62      * @param pingExecutor          Executor that is used for periodical sending of web-socket ping messages to keep
63      *                              session up even if the notifications doesn't flow from server to clients or clients
64      *                              don't implement ping-pong service.
65      * @param stream                YANG notification or data-change event listener to which client on this web-socket
66      *                              session subscribes to.
67      * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters).
68      *                              If this parameter is set to 0, the maximum fragment length is disabled and messages
69      *                              up to 64 KB can be sent in TCP segment (exceeded notification length ends in error).
70      *                              If the parameter is set to non-zero positive value, messages longer than this
71      *                              parameter are fragmented into multiple web-socket messages sent in one transaction.
72      * @param heartbeatInterval     Interval in milliseconds of sending of ping control frames to remote endpoint
73      *                              to keep session up. Ping control frames are disabled if this parameter is set to 0.
74      */
75     WebSocketSender(final PingExecutor pingExecutor, final RestconfStream<?> stream, final EncodingName encodingName,
76             final @Nullable ReceiveEventsParams params, final int maximumFragmentLength, final long heartbeatInterval) {
77         this.pingExecutor = requireNonNull(pingExecutor);
78         this.stream = requireNonNull(stream);
79         this.encodingName = requireNonNull(encodingName);
80         // FIXME: NETCONF-1102: require params
81         this.params = params;
82         this.maximumFragmentLength = maximumFragmentLength;
83         this.heartbeatInterval = heartbeatInterval;
84     }
85
86     /**
87      * Handling of the web-socket connected event (web-socket session between local server and remote endpoint has been
88      * established). Web-socket session handler is registered at data-change-event / YANG notification listener and
89      * the heartbeat ping process is started if it is enabled.
90      *
91      * @param webSocketSession Created web-socket session.
92      * @see OnWebSocketConnect More information about invocation of this method and parameters.
93      */
94     @OnWebSocketConnect
95     public synchronized void onWebSocketConnected(final Session webSocketSession) {
96         if (session == null || !session.isOpen()) {
97             session = webSocketSession;
98             try {
99                 subscriber = stream.addSubscriber(this, encodingName, params);
100             } catch (IllegalArgumentException | XPathExpressionException | UnsupportedEncodingException e) {
101                 LOG.info("Closing web-socket session {}", webSocketSession, e);
102                 webSocketSession.close(404, "Unsupported encoding " + encodingName);
103                 session = null;
104                 return;
105             }
106
107             LOG.debug("A new web-socket session {} has been successfully registered.", webSocketSession);
108             if (heartbeatInterval != 0) {
109                 // sending of PING frame can be long if there is an error on web-socket - from this reason
110                 // the fixed-rate should not be used
111                 pingProcess = pingExecutor.startPingProcess(this::sendPing, heartbeatInterval, TimeUnit.MILLISECONDS);
112             }
113         }
114     }
115
116     /**
117      * Handling of web-socket session closed event (timeout, error, or both parties closed session). Removal
118      * of subscription at listener and stopping of the ping process.
119      *
120      * @param statusCode Web-socket status code.
121      * @param reason     Reason, why the web-socket is closed (for example, reached timeout).
122      * @see OnWebSocketClose More information about invocation of this method and parameters.
123      */
124     @OnWebSocketClose
125     public synchronized void onWebSocketClosed(final int statusCode, final String reason) {
126         // note: there is not guarantee that Session.isOpen() returns true - it is better to not check it here
127         // using 'session != null && session.isOpen()'
128         if (session != null) {
129             LOG.debug("Web-socket session has been closed with status code {} and reason message: {}.", statusCode,
130                 reason);
131             if (subscriber != null) {
132                 subscriber.close();
133                 subscriber = null;
134             }
135             stopPingProcess();
136         }
137     }
138
139     /**
140      * Handling of error in web-socket implementation. Subscription at listener is removed, open session is closed
141      * and ping process is stopped.
142      *
143      * @param error Error details.
144      * @see OnWebSocketError More information about invocation of this method and parameters.
145      */
146     @OnWebSocketError
147     public synchronized void onWebSocketError(final Throwable error) {
148         if (error instanceof CloseException && error.getCause() instanceof TimeoutException timeout) {
149             // A timeout is expected, do not log the complete stack trace
150             LOG.info("Web-socket closed by timeout: {}", timeout.getMessage());
151         } else {
152             LOG.warn("An error occurred on web-socket: ", error);
153         }
154         if (session != null) {
155             LOG.info("Trying to close web-socket session {} gracefully after error.", session);
156             if (subscriber != null) {
157                 subscriber.close();
158                 subscriber = null;
159             }
160             if (session.isOpen()) {
161                 session.close();
162             }
163             stopPingProcess();
164         }
165     }
166
167     private void stopPingProcess() {
168         if (pingProcess != null) {
169             pingProcess.close();
170             pingProcess = null;
171         }
172     }
173
174     @Override
175     public synchronized void endOfStream() {
176         if (session != null && session.isOpen()) {
177             session.close();
178         }
179         stopPingProcess();
180     }
181
182     /**
183      * Sensing of string message to remote endpoint of {@link org.eclipse.jetty.websocket.api.Session}. If the maximum
184      * fragment length is set to non-zero positive value and input message exceeds this value, message is fragmented
185      * to multiple message fragments which are send individually but still in one web-socket transaction.
186      *
187      * @param message Message data to be send over web-socket session.
188      */
189     @Override
190     public synchronized void sendDataMessage(final String message) {
191         if (Strings.isNullOrEmpty(message)) {
192             // FIXME: should this be tolerated?
193             return;
194         }
195
196         if (session != null && session.isOpen()) {
197             final var remoteEndpoint = session.getRemote();
198             if (maximumFragmentLength == 0 || message.length() <= maximumFragmentLength) {
199                 sendDataMessage(message, remoteEndpoint);
200             } else {
201                 sendFragmentedMessage(splitMessageToFragments(message, maximumFragmentLength), remoteEndpoint);
202             }
203         } else {
204             LOG.trace("Message with body '{}' is not sent because underlay web-socket session is not open.", message);
205         }
206     }
207
208     private void sendDataMessage(final String message, final RemoteEndpoint remoteEndpoint) {
209         try {
210             remoteEndpoint.sendString(message);
211             LOG.trace("Message with body '{}' has been successfully sent to remote endpoint {}.", message,
212                 remoteEndpoint);
213         } catch (IOException e) {
214             LOG.warn("Cannot send message over web-socket session {}.", session, e);
215         }
216     }
217
218     private void sendFragmentedMessage(final List<String> orderedFragments, final RemoteEndpoint remoteEndpoint) {
219         for (int i = 0; i < orderedFragments.size(); i++) {
220             final String fragment = orderedFragments.get(i);
221             final boolean last = i == orderedFragments.size() - 1;
222
223             try {
224                 remoteEndpoint.sendPartialString(fragment, last);
225             } catch (IOException e) {
226                 LOG.warn("Cannot send message fragment number {} over web-socket session {}. All other fragments of "
227                     + " the message are dropped too.", i, session, e);
228                 return;
229             }
230             LOG.trace("Message fragment number {} with body '{}' has been successfully sent to remote endpoint {}.", i,
231                 fragment, remoteEndpoint);
232         }
233     }
234
235     private synchronized void sendPing() {
236         try {
237             Objects.requireNonNull(session).getRemote().sendPing(ByteBuffer.wrap(PING_PAYLOAD));
238         } catch (IOException e) {
239             LOG.warn("Cannot send ping message over web-socket session {}.", session, e);
240         }
241     }
242
243     private static List<String> splitMessageToFragments(final String inputMessage, final int maximumFragmentLength) {
244         final var parts = new ArrayList<String>();
245         int length = inputMessage.length();
246         for (int i = 0; i < length; i += maximumFragmentLength) {
247             parts.add(inputMessage.substring(i, Math.min(length, i + maximumFragmentLength)));
248         }
249         return parts;
250     }
251 }