Expose streams with all supported encodings
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / SSESessionHandler.java
1 /*
2  * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.CharMatcher;
14 import com.google.common.base.Strings;
15 import java.io.UnsupportedEncodingException;
16 import java.util.concurrent.ScheduledExecutorService;
17 import java.util.concurrent.ScheduledFuture;
18 import java.util.concurrent.TimeUnit;
19 import javax.ws.rs.sse.Sse;
20 import javax.ws.rs.sse.SseEventSink;
21 import javax.xml.xpath.XPathExpressionException;
22 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
23 import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
24 import org.opendaylight.yangtools.concepts.Registration;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * SSE session handler that is responsible for controlling of session, managing subscription to data-change-event or
30  * notification listener, and sending of data over established SSE session.
31  */
32 public final class SSESessionHandler implements StreamSessionHandler {
33     private static final Logger LOG = LoggerFactory.getLogger(SSESessionHandler.class);
34     private static final CharMatcher CR_OR_LF = CharMatcher.anyOf("\r\n");
35
36     private final ScheduledExecutorService executorService;
37     private final RestconfStream<?> stream;
38     private final EncodingName encoding;
39     private final ReceiveEventsParams params;
40     private final SseEventSink sink;
41     private final Sse sse;
42     private final int maximumFragmentLength;
43     private final int heartbeatInterval;
44
45     private ScheduledFuture<?> pingProcess;
46     private Registration subscriber;
47
48     /**
49      * Creation of the new server-sent events session handler.
50      *
51      * @param executorService Executor that is used for periodical sending of SSE ping messages to keep session up even
52      *            if the notifications doesn't flow from server to clients or clients don't implement ping-pong
53      *            service.
54      * @param stream YANG notification or data-change event listener to which client on this SSE session subscribes to.
55      * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters). If this
56      *            parameter is set to 0, the maximum fragment length is disabled and messages up to 64 KB can be sent
57      *            (exceeded notification length ends in error). If the parameter is set to non-zero positive value,
58      *            messages longer than this parameter are fragmented into multiple SSE messages sent in one
59      *            transaction.
60      * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep
61      *            session up. Ping control frames are disabled if this parameter is set to 0.
62      */
63     public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
64             final RestconfStream<?> stream, final EncodingName encoding, final ReceiveEventsParams params,
65             final int maximumFragmentLength, final int heartbeatInterval) {
66         this.executorService = requireNonNull(executorService);
67         this.sse = requireNonNull(sse);
68         this.sink = requireNonNull(sink);
69         this.stream = requireNonNull(stream);
70         this.encoding = requireNonNull(encoding);
71         this.params = requireNonNull(params);
72         this.maximumFragmentLength = maximumFragmentLength;
73         this.heartbeatInterval = heartbeatInterval;
74     }
75
76     /**
77      * Initialization of SSE connection. SSE session handler is registered at data-change-event / YANG notification
78      * listener and the heartbeat ping process is started if it is enabled.
79      *
80      * @throws UnsupportedEncodingException if the subscriber cannot be instantiated
81      * @throws XPathExpressionException if the subscriber cannot be instantiated
82      * @throws IllegalArgumentException if the subscriber cannot be instantiated
83      */
84     public synchronized boolean init() throws UnsupportedEncodingException, XPathExpressionException {
85         final var local = stream.addSubscriber(this, encoding, params);
86         if (local == null) {
87             return false;
88         }
89
90         subscriber = local;
91         if (heartbeatInterval != 0) {
92             pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
93                 heartbeatInterval, TimeUnit.MILLISECONDS);
94         }
95         return true;
96     }
97
98     /**
99      * Handling of SSE session close event. Removal of subscription at listener and stopping of the ping process.
100      */
101     @VisibleForTesting
102     synchronized void close() {
103         final var local = subscriber;
104         if (local != null) {
105             local.close();
106             stopPingProcess();
107         }
108     }
109
110     /**
111      * Sending of string message to outbound Server-Sent Events channel {@link SseEventSink}. SSE is automatically split
112      * to fragments with new line character. If the maximum fragment length is set to non-zero positive value and input
113      * message exceeds this value, message is manually fragmented to multiple message fragments which are send
114      * individually. Previous fragmentation is removed.
115      *
116      * @param message Message data to be send over web-socket session.
117      */
118     @Override
119     public synchronized void sendDataMessage(final String message) {
120         if (Strings.isNullOrEmpty(message)) {
121             // FIXME: should this be tolerated?
122             return;
123         }
124         if (!sink.isClosed()) {
125             final String toSend = maximumFragmentLength != 0 && message.length() > maximumFragmentLength
126                 ? splitMessageToFragments(message) : message;
127             sink.send(sse.newEvent(toSend));
128         } else {
129             close();
130         }
131     }
132
133     @Override
134     public synchronized void endOfStream() {
135         stopPingProcess();
136         sink.close();
137     }
138
139     /**
140      * Split message to fragments. SSE automatically fragment string with new line character.
141      * For manual fragmentation we will remove all new line characters
142      *
143      * @param message Message data to be split.
144      * @return splitted message
145      */
146     private String splitMessageToFragments(final String message) {
147         StringBuilder outputMessage = new StringBuilder();
148         String inputmessage = CR_OR_LF.removeFrom(message);
149         int length = inputmessage.length();
150         for (int i = 0; i < length; i += maximumFragmentLength) {
151             outputMessage.append(inputmessage, i, Math.min(length, i + maximumFragmentLength)).append("\r\n");
152         }
153         return outputMessage.toString();
154     }
155
156     private synchronized void sendPingMessage() {
157         if (!sink.isClosed()) {
158             LOG.debug("sending PING");
159             sink.send(sse.newEventBuilder().comment("ping").build());
160         } else {
161             close();
162         }
163     }
164
165     private void stopPingProcess() {
166         if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
167             pingProcess.cancel(true);
168         }
169     }
170
171     // TODO:return some type of identification of connection
172     @Override
173     public String toString() {
174         return sink.toString();
175     }
176 }