Improve subscriber tracking
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / WebSocketSessionHandler.java
1 /*
2  * Copyright © 2019 FRINX s.r.o. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Strings;
13 import java.io.IOException;
14 import java.nio.ByteBuffer;
15 import java.nio.charset.Charset;
16 import java.util.ArrayList;
17 import java.util.List;
18 import java.util.Objects;
19 import java.util.concurrent.ScheduledExecutorService;
20 import java.util.concurrent.ScheduledFuture;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23 import org.eclipse.jetty.websocket.api.CloseException;
24 import org.eclipse.jetty.websocket.api.RemoteEndpoint;
25 import org.eclipse.jetty.websocket.api.Session;
26 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
27 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
28 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
29 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
30 import org.opendaylight.yangtools.concepts.Registration;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * Web-socket session handler that is responsible for controlling of session, managing subscription
36  * to data-change-event or notification listener, and sending of data over established web-socket session.
37  */
38 @WebSocket
39 public final class WebSocketSessionHandler implements StreamSessionHandler {
40     private static final Logger LOG = LoggerFactory.getLogger(WebSocketSessionHandler.class);
41     private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset());
42
43     private final ScheduledExecutorService executorService;
44     private final RestconfStream<?> stream;
45     private final int maximumFragmentLength;
46     private final int heartbeatInterval;
47
48     private Session session;
49     private Registration subscriber;
50     private ScheduledFuture<?> pingProcess;
51
52     /**
53      * Creation of the new web-socket session handler.
54      *
55      * @param executorService       Executor that is used for periodical sending of web-socket ping messages to keep
56      *                              session up even if the notifications doesn't flow from server to clients or clients
57      *                              don't implement ping-pong service.
58      * @param stream                YANG notification or data-change event listener to which client on this web-socket
59      *                              session subscribes to.
60      * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters).
61      *                              If this parameter is set to 0, the maximum fragment length is disabled and messages
62      *                              up to 64 KB can be sent in TCP segment (exceeded notification length ends in error).
63      *                              If the parameter is set to non-zero positive value, messages longer than this
64      *                              parameter are fragmented into multiple web-socket messages sent in one transaction.
65      * @param heartbeatInterval     Interval in milliseconds of sending of ping control frames to remote endpoint
66      *                              to keep session up. Ping control frames are disabled if this parameter is set to 0.
67      */
68     WebSocketSessionHandler(final ScheduledExecutorService executorService, final RestconfStream<?> stream,
69             final int maximumFragmentLength, final int heartbeatInterval) {
70         this.executorService = requireNonNull(executorService);
71         this.stream = requireNonNull(stream);
72         this.maximumFragmentLength = maximumFragmentLength;
73         this.heartbeatInterval = heartbeatInterval;
74     }
75
76     /**
77      * Handling of the web-socket connected event (web-socket session between local server and remote endpoint has been
78      * established). Web-socket session handler is registered at data-change-event / YANG notification listener and
79      * the heartbeat ping process is started if it is enabled.
80      *
81      * @param webSocketSession Created web-socket session.
82      * @see OnWebSocketConnect More information about invocation of this method and parameters.
83      */
84     @OnWebSocketConnect
85     public synchronized void onWebSocketConnected(final Session webSocketSession) {
86         if (session == null || !session.isOpen()) {
87             session = webSocketSession;
88             subscriber = stream.addSubscriber(this);
89
90             LOG.debug("A new web-socket session {} has been successfully registered.", webSocketSession);
91             if (heartbeatInterval != 0) {
92                 // sending of PING frame can be long if there is an error on web-socket - from this reason
93                 // the fixed-rate should not be used
94                 pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
95                         heartbeatInterval, TimeUnit.MILLISECONDS);
96             }
97         }
98     }
99
100     /**
101      * Handling of web-socket session closed event (timeout, error, or both parties closed session). Removal
102      * of subscription at listener and stopping of the ping process.
103      *
104      * @param statusCode Web-socket status code.
105      * @param reason     Reason, why the web-socket is closed (for example, reached timeout).
106      * @see OnWebSocketClose More information about invocation of this method and parameters.
107      */
108     @OnWebSocketClose
109     public synchronized void onWebSocketClosed(final int statusCode, final String reason) {
110         // note: there is not guarantee that Session.isOpen() returns true - it is better to not check it here
111         // using 'session != null && session.isOpen()'
112         if (session != null) {
113             LOG.debug("Web-socket session has been closed with status code {} and reason message: {}.", statusCode,
114                 reason);
115             if (subscriber != null) {
116                 subscriber.close();
117                 subscriber = null;
118             }
119             stopPingProcess();
120         }
121     }
122
123     /**
124      * Handling of error in web-socket implementation. Subscription at listener is removed, open session is closed
125      * and ping process is stopped.
126      *
127      * @param error Error details.
128      * @see OnWebSocketError More information about invocation of this method and parameters.
129      */
130     @OnWebSocketError
131     public synchronized void onWebSocketError(final Throwable error) {
132         if (error instanceof CloseException && error.getCause() instanceof TimeoutException timeout) {
133             // A timeout is expected, do not log the complete stack trace
134             LOG.info("Web-socket closed by timeout: {}", timeout.getMessage());
135         } else {
136             LOG.warn("An error occurred on web-socket: ", error);
137         }
138         if (session != null) {
139             LOG.info("Trying to close web-socket session {} gracefully after error.", session);
140             if (subscriber != null) {
141                 subscriber.close();
142                 subscriber = null;
143             }
144             if (session.isOpen()) {
145                 session.close();
146             }
147             stopPingProcess();
148         }
149     }
150
151     private void stopPingProcess() {
152         if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
153             pingProcess.cancel(true);
154         }
155     }
156
157     @Override
158     public synchronized void endOfStream() {
159         if (session != null && session.isOpen()) {
160             session.close();
161         }
162         stopPingProcess();
163     }
164
165     /**
166      * Sensing of string message to remote endpoint of {@link org.eclipse.jetty.websocket.api.Session}. If the maximum
167      * fragment length is set to non-zero positive value and input message exceeds this value, message is fragmented
168      * to multiple message fragments which are send individually but still in one web-socket transaction.
169      *
170      * @param message Message data to be send over web-socket session.
171      */
172     @Override
173     public synchronized void sendDataMessage(final String message) {
174         if (Strings.isNullOrEmpty(message)) {
175             // FIXME: should this be tolerated?
176             return;
177         }
178
179         if (session != null && session.isOpen()) {
180             final var remoteEndpoint = session.getRemote();
181             if (maximumFragmentLength == 0 || message.length() <= maximumFragmentLength) {
182                 sendDataMessage(message, remoteEndpoint);
183             } else {
184                 sendFragmentedMessage(splitMessageToFragments(message, maximumFragmentLength), remoteEndpoint);
185             }
186         } else {
187             LOG.trace("Message with body '{}' is not sent because underlay web-socket session is not open.", message);
188         }
189     }
190
191     private void sendDataMessage(final String message, final RemoteEndpoint remoteEndpoint) {
192         try {
193             remoteEndpoint.sendString(message);
194             LOG.trace("Message with body '{}' has been successfully sent to remote endpoint {}.", message,
195                 remoteEndpoint);
196         } catch (IOException e) {
197             LOG.warn("Cannot send message over web-socket session {}.", session, e);
198         }
199     }
200
201     private void sendFragmentedMessage(final List<String> orderedFragments, final RemoteEndpoint remoteEndpoint) {
202         for (int i = 0; i < orderedFragments.size(); i++) {
203             final String fragment = orderedFragments.get(i);
204             final boolean last = i == orderedFragments.size() - 1;
205
206             try {
207                 remoteEndpoint.sendPartialString(fragment, last);
208             } catch (IOException e) {
209                 LOG.warn("Cannot send message fragment number {} over web-socket session {}. All other fragments of "
210                     + " the message are dropped too.", i, session, e);
211                 return;
212             }
213             LOG.trace("Message fragment number {} with body '{}' has been successfully sent to remote endpoint {}.", i,
214                 fragment, remoteEndpoint);
215         }
216     }
217
218     private synchronized void sendPingMessage() {
219         try {
220             Objects.requireNonNull(session).getRemote().sendPing(ByteBuffer.wrap(PING_PAYLOAD));
221         } catch (IOException e) {
222             LOG.warn("Cannot send ping message over web-socket session {}.", session, e);
223         }
224     }
225
226     private static List<String> splitMessageToFragments(final String inputMessage, final int maximumFragmentLength) {
227         final var parts = new ArrayList<String>();
228         int length = inputMessage.length();
229         for (int i = 0; i < length; i += maximumFragmentLength) {
230             parts.add(inputMessage.substring(i, Math.min(length, i + maximumFragmentLength)));
231         }
232         return parts;
233     }
234
235     @Override
236     public synchronized boolean isConnected() {
237         return session != null && session.isOpen();
238     }
239 }