2 * Copyright © 2019 FRINX s.r.o. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.Strings;
13 import java.io.IOException;
14 import java.io.UnsupportedEncodingException;
15 import java.nio.ByteBuffer;
16 import java.nio.charset.Charset;
17 import java.util.ArrayList;
18 import java.util.List;
19 import java.util.Objects;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import javax.xml.xpath.XPathExpressionException;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.eclipse.jetty.websocket.api.CloseException;
25 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
26 import org.eclipse.jetty.websocket.api.Session;
27 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
28 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
29 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
30 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
31 import org.opendaylight.restconf.server.api.EventStreamGetParams;
32 import org.opendaylight.restconf.server.spi.RestconfStream;
33 import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
34 import org.opendaylight.restconf.server.spi.RestconfStream.Sender;
35 import org.opendaylight.yangtools.concepts.Registration;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * Web-socket session handler that is responsible for controlling of session, managing subscription
41 * to data-change-event or notification listener, and sending of data over established web-socket session.
44 @Deprecated(since = "7.0.0", forRemoval = true)
45 final class WebSocketSender implements Sender {
46 private static final Logger LOG = LoggerFactory.getLogger(WebSocketSender.class);
47 private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset());
49 private final PingExecutor pingExecutor;
50 private final RestconfStream<?> stream;
51 private final EncodingName encodingName;
52 private final EventStreamGetParams params;
53 private final int maximumFragmentLength;
54 private final long heartbeatInterval;
56 private Session session;
57 private Registration subscriber;
58 private Registration pingProcess;
61 * Creation of the new web-socket session handler.
63 * @param pingExecutor Executor that is used for periodical sending of web-socket ping messages to keep
64 * session up even if the notifications doesn't flow from server to clients or clients
65 * don't implement ping-pong service.
66 * @param stream YANG notification or data-change event listener to which client on this web-socket
67 * session subscribes to.
68 * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters).
69 * If this parameter is set to 0, the maximum fragment length is disabled and messages
70 * up to 64 KB can be sent in TCP segment (exceeded notification length ends in error).
71 * If the parameter is set to non-zero positive value, messages longer than this
72 * parameter are fragmented into multiple web-socket messages sent in one transaction.
73 * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint
74 * to keep session up. Ping control frames are disabled if this parameter is set to 0.
76 WebSocketSender(final PingExecutor pingExecutor, final RestconfStream<?> stream, final EncodingName encodingName,
77 final @Nullable EventStreamGetParams params, final int maximumFragmentLength,
78 final long heartbeatInterval) {
79 this.pingExecutor = requireNonNull(pingExecutor);
80 this.stream = requireNonNull(stream);
81 this.encodingName = requireNonNull(encodingName);
82 // FIXME: NETCONF-1102: require params
84 this.maximumFragmentLength = maximumFragmentLength;
85 this.heartbeatInterval = heartbeatInterval;
89 * Handling of the web-socket connected event (web-socket session between local server and remote endpoint has been
90 * established). Web-socket session handler is registered at data-change-event / YANG notification listener and
91 * the heartbeat ping process is started if it is enabled.
93 * @param webSocketSession Created web-socket session.
94 * @see OnWebSocketConnect More information about invocation of this method and parameters.
97 public synchronized void onWebSocketConnected(final Session webSocketSession) {
98 if (session == null || !session.isOpen()) {
99 session = webSocketSession;
101 subscriber = stream.addSubscriber(this, encodingName, params);
102 } catch (IllegalArgumentException | XPathExpressionException | UnsupportedEncodingException e) {
103 LOG.info("Closing web-socket session {}", webSocketSession, e);
104 webSocketSession.close(404, "Unsupported encoding " + encodingName);
109 LOG.debug("A new web-socket session {} has been successfully registered.", webSocketSession);
110 if (heartbeatInterval != 0) {
111 // sending of PING frame can be long if there is an error on web-socket - from this reason
112 // the fixed-rate should not be used
113 pingProcess = pingExecutor.startPingProcess(this::sendPing, heartbeatInterval, TimeUnit.MILLISECONDS);
119 * Handling of web-socket session closed event (timeout, error, or both parties closed session). Removal
120 * of subscription at listener and stopping of the ping process.
122 * @param statusCode Web-socket status code.
123 * @param reason Reason, why the web-socket is closed (for example, reached timeout).
124 * @see OnWebSocketClose More information about invocation of this method and parameters.
127 public synchronized void onWebSocketClosed(final int statusCode, final String reason) {
128 // note: there is not guarantee that Session.isOpen() returns true - it is better to not check it here
129 // using 'session != null && session.isOpen()'
130 if (session != null) {
131 LOG.debug("Web-socket session has been closed with status code {} and reason message: {}.", statusCode,
133 if (subscriber != null) {
142 * Handling of error in web-socket implementation. Subscription at listener is removed, open session is closed
143 * and ping process is stopped.
145 * @param error Error details.
146 * @see OnWebSocketError More information about invocation of this method and parameters.
149 public synchronized void onWebSocketError(final Throwable error) {
150 if (error instanceof CloseException && error.getCause() instanceof TimeoutException timeout) {
151 // A timeout is expected, do not log the complete stack trace
152 LOG.info("Web-socket closed by timeout: {}", timeout.getMessage());
154 LOG.warn("An error occurred on web-socket: ", error);
156 if (session != null) {
157 LOG.info("Trying to close web-socket session {} gracefully after error.", session);
158 if (subscriber != null) {
162 if (session.isOpen()) {
169 private void stopPingProcess() {
170 if (pingProcess != null) {
177 public synchronized void endOfStream() {
178 if (session != null && session.isOpen()) {
185 * Sensing of string message to remote endpoint of {@link org.eclipse.jetty.websocket.api.Session}. If the maximum
186 * fragment length is set to non-zero positive value and input message exceeds this value, message is fragmented
187 * to multiple message fragments which are send individually but still in one web-socket transaction.
189 * @param message Message data to be send over web-socket session.
192 public synchronized void sendDataMessage(final String message) {
193 if (Strings.isNullOrEmpty(message)) {
194 // FIXME: should this be tolerated?
198 if (session != null && session.isOpen()) {
199 final var remoteEndpoint = session.getRemote();
200 if (maximumFragmentLength == 0 || message.length() <= maximumFragmentLength) {
201 sendDataMessage(message, remoteEndpoint);
203 sendFragmentedMessage(splitMessageToFragments(message, maximumFragmentLength), remoteEndpoint);
206 LOG.trace("Message with body '{}' is not sent because underlay web-socket session is not open.", message);
210 private void sendDataMessage(final String message, final RemoteEndpoint remoteEndpoint) {
212 remoteEndpoint.sendString(message);
213 LOG.trace("Message with body '{}' has been successfully sent to remote endpoint {}.", message,
215 } catch (IOException e) {
216 LOG.warn("Cannot send message over web-socket session {}.", session, e);
220 private void sendFragmentedMessage(final List<String> orderedFragments, final RemoteEndpoint remoteEndpoint) {
221 for (int i = 0; i < orderedFragments.size(); i++) {
222 final String fragment = orderedFragments.get(i);
223 final boolean last = i == orderedFragments.size() - 1;
226 remoteEndpoint.sendPartialString(fragment, last);
227 } catch (IOException e) {
228 LOG.warn("Cannot send message fragment number {} over web-socket session {}. All other fragments of "
229 + " the message are dropped too.", i, session, e);
232 LOG.trace("Message fragment number {} with body '{}' has been successfully sent to remote endpoint {}.", i,
233 fragment, remoteEndpoint);
237 private synchronized void sendPing() {
239 Objects.requireNonNull(session).getRemote().sendPing(ByteBuffer.wrap(PING_PAYLOAD));
240 } catch (IOException e) {
241 LOG.warn("Cannot send ping message over web-socket session {}.", session, e);
245 private static List<String> splitMessageToFragments(final String inputMessage, final int maximumFragmentLength) {
246 final var parts = new ArrayList<String>();
247 int length = inputMessage.length();
248 for (int i = 0; i < length; i += maximumFragmentLength) {
249 parts.add(inputMessage.substring(i, Math.min(length, i + maximumFragmentLength)));