03542785310a2b044db36134a6bf0300fd992a1e
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / WebSocketSessionHandler.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import com.google.common.base.Strings;
11 import java.io.IOException;
12 import java.net.InetSocketAddress;
13 import java.nio.ByteBuffer;
14 import java.nio.charset.Charset;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.Objects;
18 import java.util.Optional;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
23 import org.eclipse.jetty.websocket.api.Session;
24 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
25 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
26 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
27 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
28 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Web-socket session handler that is responsible for controlling of session, managing subscription
34  * to data-change-event or notification listener, and sending of data over established web-socket session.
35  */
36 @WebSocket
37 public final class WebSocketSessionHandler implements StreamSessionHandler {
38     private static final Logger LOG = LoggerFactory.getLogger(WebSocketSessionHandler.class);
39     private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset());
40
41     private final ScheduledExecutorService executorService;
42     private final BaseListenerInterface listener;
43     private final int maximumFragmentLength;
44     private final int heartbeatInterval;
45
46     private Session session;
47     private ScheduledFuture<?> pingProcess;
48
49     /**
50      * Creation of the new web-socket session handler.
51      *
52      * @param executorService       Executor that is used for periodical sending of web-socket ping messages to keep
53      *                              session up even if the notifications doesn't flow from server to clients or clients
54      *                              don't implement ping-pong service.
55      * @param listener              YANG notification or data-change event listener to which client on this web-socket
56      *                              session subscribes to.
57      * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters).
58      *                              If this parameter is set to 0, the maximum fragment length is disabled and messages
59      *                              up to 64 KB can be sent in TCP segment (exceeded notification length ends in error).
60      *                              If the parameter is set to non-zero positive value, messages longer than this
61      *                              parameter are fragmented into multiple web-socket messages sent in one transaction.
62      * @param heartbeatInterval     Interval in milliseconds of sending of ping control frames to remote endpoint
63      *                              to keep session up. Ping control frames are disabled if this parameter is set to 0.
64      */
65     WebSocketSessionHandler(final ScheduledExecutorService executorService, final BaseListenerInterface listener,
66             final int maximumFragmentLength, final int heartbeatInterval) {
67         this.executorService = executorService;
68         this.listener = listener;
69         this.maximumFragmentLength = maximumFragmentLength;
70         this.heartbeatInterval = heartbeatInterval;
71     }
72
73     /**
74      * Handling of the web-socket connected event (web-socket session between local server and remote endpoint has been
75      * established). Web-socket session handler is registered at data-change-event / YANG notification listener and
76      * the heartbeat ping process is started if it is enabled.
77      *
78      * @param webSocketSession Created web-socket session.
79      * @see OnWebSocketConnect More information about invocation of this method and parameters.
80      */
81     @OnWebSocketConnect
82     public synchronized void onWebSocketConnected(final Session webSocketSession) {
83         if (session == null || !session.isOpen()) {
84             session = webSocketSession;
85             listener.addSubscriber(this);
86             LOG.debug("A new web-socket session {} has been successfully registered.", webSocketSession);
87             if (heartbeatInterval != 0) {
88                 // sending of PING frame can be long if there is an error on web-socket - from this reason
89                 // the fixed-rate should not be used
90                 pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
91                         heartbeatInterval, TimeUnit.MILLISECONDS);
92             }
93         }
94     }
95
96     /**
97      * Handling of web-socket session closed event (timeout, error, or both parties closed session). Removal
98      * of subscription at listener and stopping of the ping process.
99      *
100      * @param statusCode Web-socket status code.
101      * @param reason     Reason, why the web-socket is closed (for example, reached timeout).
102      * @see OnWebSocketClose More information about invocation of this method and parameters.
103      */
104     @OnWebSocketClose
105     public synchronized void onWebSocketClosed(final int statusCode, final String reason) {
106         // note: there is not guarantee that Session.isOpen() returns true - it is better to not check it here
107         // using 'session != null && session.isOpen()'
108         if (session != null) {
109             LOG.debug("Web-socket session has been closed with status code {} and reason message: {}.",
110                     statusCode, reason);
111             listener.removeSubscriber(this);
112             stopPingProcess();
113         }
114     }
115
116     /**
117      * Handling of error in web-socket implementation. Subscription at listener is removed, open session is closed
118      * and ping process is stopped.
119      *
120      * @param error Error details.
121      * @see OnWebSocketError More information about invocation of this method and parameters.
122      */
123     @OnWebSocketError
124     public synchronized void onWebSocketError(final Throwable error) {
125         LOG.warn("An error occurred on web-socket: ", error);
126         if (session != null) {
127             LOG.warn("Trying to close web-socket session {} gracefully after error.", session);
128             listener.removeSubscriber(this);
129             if (session.isOpen()) {
130                 session.close();
131             }
132             stopPingProcess();
133         }
134     }
135
136     private void stopPingProcess() {
137         if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
138             pingProcess.cancel(true);
139         }
140     }
141
142     /**
143      * Sensing of string message to remote endpoint of {@link org.eclipse.jetty.websocket.api.Session}. If the maximum
144      * fragment length is set to non-zero positive value and input message exceeds this value, message is fragmented
145      * to multiple message fragments which are send individually but still in one web-socket transaction.
146      *
147      * @param message Message data to be send over web-socket session.
148      */
149     @Override
150     public synchronized void sendDataMessage(final String message) {
151         if (Strings.isNullOrEmpty(message)) {
152             // FIXME: should this be tolerated?
153             return;
154         }
155
156         if (session != null && session.isOpen()) {
157             final RemoteEndpoint remoteEndpoint = session.getRemote();
158             if (maximumFragmentLength == 0 || message.length() <= maximumFragmentLength) {
159                 sendDataMessage(message, remoteEndpoint);
160             } else {
161                 sendFragmentedMessage(splitMessageToFragments(message, maximumFragmentLength), remoteEndpoint);
162             }
163         } else {
164             LOG.trace("Message with body '{}' is not sent because underlay web-socket session is not open.", message);
165         }
166     }
167
168     private void sendDataMessage(final String message, final RemoteEndpoint remoteEndpoint) {
169         try {
170             remoteEndpoint.sendString(message);
171             LOG.trace("Message with body '{}' has been successfully sent to remote endpoint {}.", message,
172                 remoteEndpoint);
173         } catch (IOException e) {
174             LOG.warn("Cannot send message over web-socket session {}.", session, e);
175         }
176     }
177
178     private void sendFragmentedMessage(final List<String> orderedFragments, final RemoteEndpoint remoteEndpoint) {
179         for (int i = 0; i < orderedFragments.size(); i++) {
180             final String fragment = orderedFragments.get(i);
181             final boolean last = i == orderedFragments.size() - 1;
182
183             try {
184                 remoteEndpoint.sendPartialString(fragment, last);
185             } catch (IOException e) {
186                 LOG.warn("Cannot send message fragment number {} over web-socket session {}. All other fragments of "
187                     + " the message are dropped too.", i, session, e);
188                 return;
189             }
190             LOG.trace("Message fragment number {} with body '{}' has been successfully sent to remote endpoint {}.", i,
191                 fragment, remoteEndpoint);
192         }
193     }
194
195     private synchronized void sendPingMessage() {
196         try {
197             Objects.requireNonNull(session).getRemote().sendPing(ByteBuffer.wrap(PING_PAYLOAD));
198         } catch (IOException e) {
199             LOG.warn("Cannot send ping message over web-socket session {}.", session, e);
200         }
201     }
202
203     private static List<String> splitMessageToFragments(final String inputMessage, final int maximumFragmentLength) {
204         final List<String> parts = new ArrayList<>();
205         int length = inputMessage.length();
206         for (int i = 0; i < length; i += maximumFragmentLength) {
207             parts.add(inputMessage.substring(i, Math.min(length, i + maximumFragmentLength)));
208         }
209         return parts;
210     }
211
212     /**
213      * Get remote endpoint address of the current web-socket session.
214      *
215      * @return If the session exists and is open the {@link InetSocketAddress} wrapped in {@link Optional} is returned.
216      *     Otherwise, {@link Optional#empty()} is returned.
217      */
218     // FIXME: remove this method?
219     public synchronized Optional<InetSocketAddress> getRemoteEndpointAddress() {
220         if (session != null && session.isOpen()) {
221             return Optional.of(session.getRemote().getInetSocketAddress());
222         } else {
223             return Optional.empty();
224         }
225     }
226
227     @Override
228     public synchronized boolean isConnected() {
229         if (session != null && session.isOpen()) {
230             return true;
231         } else {
232             return false;
233         }
234     }
235 }