2 * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.CharMatcher;
14 import com.google.common.base.Strings;
15 import java.io.UnsupportedEncodingException;
16 import java.util.concurrent.TimeUnit;
17 import javax.ws.rs.sse.Sse;
18 import javax.ws.rs.sse.SseEventSink;
19 import javax.xml.xpath.XPathExpressionException;
20 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
21 import org.opendaylight.restconf.server.spi.RestconfStream;
22 import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
23 import org.opendaylight.restconf.server.spi.RestconfStream.Sender;
24 import org.opendaylight.yangtools.concepts.Registration;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
29 * SSE session handler that is responsible for controlling of session, managing subscription to data-change-event or
30 * notification listener, and sending of data over established SSE session.
32 final class SSESender implements Sender {
33 private static final Logger LOG = LoggerFactory.getLogger(SSESender.class);
34 private static final CharMatcher CR_OR_LF = CharMatcher.anyOf("\r\n");
36 private final PingExecutor pingExecutor;
37 private final RestconfStream<?> stream;
38 private final EncodingName encoding;
39 private final ReceiveEventsParams params;
40 private final SseEventSink sink;
41 private final Sse sse;
42 private final int maximumFragmentLength;
43 private final long heartbeatMillis;
45 private Registration pingProcess;
46 private Registration subscriber;
49 * Creation of the new server-sent events session handler.
51 * @param pingExecutor Executor that is used for periodical sending of SSE ping messages to keep session up even
52 * if the notifications doesn't flow from server to clients or clients don't implement ping-pong
54 * @param stream YANG notification or data-change event listener to which client on this SSE session subscribes to.
55 * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters). If this
56 * parameter is set to 0, the maximum fragment length is disabled and messages up to 64 KB can be sent
57 * (exceeded notification length ends in error). If the parameter is set to non-zero positive value,
58 * messages longer than this parameter are fragmented into multiple SSE messages sent in one
60 * @param heartbeatMillis Interval in milliseconds of sending of ping control frames to remote endpoint to keep
61 * session up. Ping control frames are disabled if this parameter is set to 0.
63 SSESender(final PingExecutor pingExecutor, final SseEventSink sink, final Sse sse, final RestconfStream<?> stream,
64 final EncodingName encoding, final ReceiveEventsParams params, final int maximumFragmentLength,
65 final long heartbeatMillis) {
66 this.pingExecutor = requireNonNull(pingExecutor);
67 this.sse = requireNonNull(sse);
68 this.sink = requireNonNull(sink);
69 this.stream = requireNonNull(stream);
70 this.encoding = requireNonNull(encoding);
71 this.params = requireNonNull(params);
72 this.maximumFragmentLength = maximumFragmentLength;
73 this.heartbeatMillis = heartbeatMillis;
77 * Initialization of SSE connection. SSE session handler is registered at data-change-event / YANG notification
78 * listener and the heartbeat ping process is started if it is enabled.
80 * @throws UnsupportedEncodingException if the subscriber cannot be instantiated
81 * @throws XPathExpressionException if the subscriber cannot be instantiated
82 * @throws IllegalArgumentException if the subscriber cannot be instantiated
84 public synchronized boolean init() throws UnsupportedEncodingException, XPathExpressionException {
85 final var local = stream.addSubscriber(this, encoding, params);
91 if (heartbeatMillis != 0) {
92 pingProcess = pingExecutor.startPingProcess(this::sendPing, heartbeatMillis, TimeUnit.MILLISECONDS);
98 * Handling of SSE session close event. Removal of subscription at listener and stopping of the ping process.
101 synchronized void close() {
102 final var local = subscriber;
110 * Sending of string message to outbound Server-Sent Events channel {@link SseEventSink}. SSE is automatically split
111 * to fragments with new line character. If the maximum fragment length is set to non-zero positive value and input
112 * message exceeds this value, message is manually fragmented to multiple message fragments which are send
113 * individually. Previous fragmentation is removed.
115 * @param message Message data to be send over web-socket session.
118 public synchronized void sendDataMessage(final String message) {
119 if (Strings.isNullOrEmpty(message)) {
120 // FIXME: should this be tolerated?
123 if (!sink.isClosed()) {
124 final String toSend = maximumFragmentLength != 0 && message.length() > maximumFragmentLength
125 ? splitMessageToFragments(message) : message;
126 sink.send(sse.newEvent(toSend));
133 public synchronized void endOfStream() {
139 * Split message to fragments. SSE automatically fragment string with new line character.
140 * For manual fragmentation we will remove all new line characters
142 * @param message Message data to be split.
143 * @return splitted message
145 private String splitMessageToFragments(final String message) {
146 StringBuilder outputMessage = new StringBuilder();
147 String inputmessage = CR_OR_LF.removeFrom(message);
148 int length = inputmessage.length();
149 for (int i = 0; i < length; i += maximumFragmentLength) {
150 outputMessage.append(inputmessage, i, Math.min(length, i + maximumFragmentLength)).append("\r\n");
152 return outputMessage.toString();
155 private synchronized void sendPing() {
156 if (!sink.isClosed()) {
157 LOG.debug("sending PING");
158 sink.send(sse.newEventBuilder().comment("ping").build());
164 private void stopPingProcess() {
165 if (pingProcess != null) {
171 // TODO:return some type of identification of connection
173 public String toString() {
174 return sink.toString();