2 * Copyright © 2019 FRINX s.r.o. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.restconf.nb.rfc8040.streams.websockets;
11 import com.google.common.base.Strings;
12 import java.io.IOException;
13 import java.net.InetSocketAddress;
14 import java.nio.ByteBuffer;
15 import java.nio.charset.Charset;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.Objects;
19 import java.util.Optional;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.concurrent.ScheduledFuture;
22 import java.util.concurrent.TimeUnit;
23 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
24 import org.eclipse.jetty.websocket.api.Session;
25 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
26 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
27 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
28 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
29 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * Web-socket session handler that is responsible for controlling of session, managing subscription
35 * to data-change-event or notification listener, and sending of data over established web-socket session.
38 public class WebSocketSessionHandler {
40 private static final Logger LOG = LoggerFactory.getLogger(WebSocketSessionHandler.class);
41 private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset());
43 private final ScheduledExecutorService executorService;
44 private final BaseListenerInterface listener;
45 private final int maximumFragmentLength;
46 private final int heartbeatInterval;
48 private Session session;
49 private ScheduledFuture<?> pingProcess;
52 * Creation of the new web-socket session handler.
54 * @param executorService Executor that is used for periodical sending of web-socket ping messages to keep
55 * session up even if the notifications doesn't flow from server to clients or clients
56 * don't implement ping-pong service.
57 * @param listener YANG notification or data-change event listener to which client on this web-socket
58 * session subscribes to.
59 * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters).
60 * If this parameter is set to 0, the maximum fragment length is disabled and messages
61 * up to 64 KB can be sent in TCP segment (exceeded notification length ends in error).
62 * If the parameter is set to non-zero positive value, messages longer than this
63 * parameter are fragmented into multiple web-socket messages sent in one transaction.
64 * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint
65 * to keep session up. Ping control frames are disabled if this parameter is set to 0.
67 WebSocketSessionHandler(final ScheduledExecutorService executorService, final BaseListenerInterface listener,
68 final int maximumFragmentLength, final int heartbeatInterval) {
69 this.executorService = executorService;
70 this.listener = listener;
71 this.maximumFragmentLength = maximumFragmentLength;
72 this.heartbeatInterval = heartbeatInterval;
76 * Handling of the web-socket connected event (web-socket session between local server and remote endpoint has been
77 * established). Web-socket session handler is registered at data-change-event / YANG notification listener and
78 * the heartbeat ping process is started if it is enabled.
80 * @param webSocketSession Created web-socket session.
81 * @see OnWebSocketConnect More information about invocation of this method and parameters.
84 public synchronized void onWebSocketConnected(final Session webSocketSession) {
85 if (session == null || !session.isOpen()) {
86 this.session = webSocketSession;
87 listener.addSubscriber(this);
88 LOG.debug("A new web-socket session {} has been successfully registered.", webSocketSession);
89 if (heartbeatInterval != 0) {
90 // sending of PING frame can be long if there is an error on web-socket - from this reason
91 // the fixed-rate should not be used
92 pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
93 heartbeatInterval, TimeUnit.MILLISECONDS);
99 * Handling of web-socket session closed event (timeout, error, or both parties closed session). Removal
100 * of subscription at listener and stopping of the ping process.
102 * @param statusCode Web-socket status code.
103 * @param reason Reason, why the web-socket is closed (for example, reached timeout).
104 * @see OnWebSocketClose More information about invocation of this method and parameters.
107 public synchronized void onWebSocketClosed(final int statusCode, final String reason) {
108 // note: there is not guarantee that Session.isOpen() returns true - it is better to not check it here
109 // using 'session != null && session.isOpen()'
110 if (session != null) {
111 LOG.debug("Web-socket session has been closed with status code {} and reason message: {}.",
113 listener.removeSubscriber(this);
119 * Handling of error in web-socket implementation. Subscription at listener is removed, open session is closed
120 * and ping process is stopped.
122 * @param error Error details.
123 * @see OnWebSocketError More information about invocation of this method and parameters.
126 public synchronized void onWebSocketError(final Throwable error) {
127 LOG.warn("An error occurred on web-socket: ", error);
128 if (session != null) {
129 LOG.warn("Trying to close web-socket session {} gracefully after error.", session);
130 listener.removeSubscriber(this);
131 if (session.isOpen()) {
138 private void stopPingProcess() {
139 if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
140 pingProcess.cancel(true);
145 * Sensing of string message to remote endpoint of {@link org.eclipse.jetty.websocket.api.Session}. If the maximum
146 * fragment length is set to non-zero positive value and input message exceeds this value, message is fragmented
147 * to multiple message fragments which are send individually but still in one web-socket transaction.
149 * @param message Message data to be send over web-socket session.
151 public synchronized void sendDataMessage(final String message) {
152 if (Strings.isNullOrEmpty(message)) {
153 // FIXME: should this be tolerated?
157 if (session != null && session.isOpen()) {
158 final RemoteEndpoint remoteEndpoint = session.getRemote();
159 if (maximumFragmentLength != 0) {
160 if (message.length() <= maximumFragmentLength) {
161 sendDataMessage(message, remoteEndpoint);
163 sendFragmentedMessage(splitMessageToFragments(message, maximumFragmentLength), remoteEndpoint);
166 sendDataMessage(message, remoteEndpoint);
169 LOG.trace("Message with body '{}' is not sent because underlay web-socket session is not open.", message);
173 private void sendDataMessage(final String message, final RemoteEndpoint remoteEndpoint) {
175 remoteEndpoint.sendString(message);
176 LOG.trace("Message with body '{}' has been successfully sent to remote endpoint {}.", message,
178 } catch (IOException e) {
179 LOG.warn("Cannot send message over web-socket session {}.", session, e);
183 private void sendFragmentedMessage(final List<String> orderedFragments, final RemoteEndpoint remoteEndpoint) {
184 for (int i = 0; i < orderedFragments.size(); i++) {
185 final String fragment = orderedFragments.get(i);
186 final boolean last = i == orderedFragments.size() - 1;
189 remoteEndpoint.sendPartialString(fragment, last);
190 } catch (IOException e) {
191 LOG.warn("Cannot send message fragment number {} over web-socket session {}. All other fragments of "
192 + " the message are dropped too.", i, session, e);
195 LOG.trace("Message fragment number {} with body '{}' has been successfully sent to remote endpoint {}.", i,
196 fragment, remoteEndpoint);
200 private synchronized void sendPingMessage() {
202 Objects.requireNonNull(session).getRemote().sendPing(ByteBuffer.wrap(PING_PAYLOAD));
203 } catch (IOException e) {
204 LOG.warn("Cannot send ping message over web-socket session {}.", session, e);
208 private static List<String> splitMessageToFragments(final String inputMessage, final int maximumFragmentLength) {
209 final List<String> parts = new ArrayList<>();
210 int length = inputMessage.length();
211 for (int i = 0; i < length; i += maximumFragmentLength) {
212 parts.add(inputMessage.substring(i, Math.min(length, i + maximumFragmentLength)));
218 * Get remote endpoint address of the current web-socket session.
220 * @return If the session exists and is open the {@link InetSocketAddress} wrapped in {@link Optional} is returned.
221 * Otherwise, {@link Optional#empty()} is returned.
223 public synchronized Optional<InetSocketAddress> getRemoteEndpointAddress() {
224 if (session != null && session.isOpen()) {
225 return Optional.of(session.getRemote().getInetSocketAddress());
227 return Optional.empty();