2 * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams.sse;
10 import com.google.common.base.Strings;
11 import java.io.IOException;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.ScheduledFuture;
14 import java.util.concurrent.TimeUnit;
15 import java.util.regex.Pattern;
16 import org.glassfish.jersey.media.sse.EventOutput;
17 import org.glassfish.jersey.media.sse.OutboundEvent;
18 import org.opendaylight.restconf.nb.rfc8040.streams.SessionHandlerInterface;
19 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
24 * SSE session handler that is responsible for controlling of session, managing subscription to data-change-event or
25 * notification listener, and sending of data over established SSE session.
27 public class SSESessionHandler implements SessionHandlerInterface {
28 private static final Logger LOG = LoggerFactory.getLogger(SSESessionHandler.class);
29 private static final String PING_PAYLOAD = "ping";
30 // FIXME: this should be a simple CharMatcher
31 private static final Pattern NEWLINE = Pattern.compile("(\\r|\\n)");
33 private final ScheduledExecutorService executorService;
34 private final BaseListenerInterface listener;
35 private final int maximumFragmentLength;
36 private final int heartbeatInterval;
37 private final EventOutput output;
38 private ScheduledFuture<?> pingProcess;
41 * Creation of the new server-sent events session handler.
43 * @param executorService Executor that is used for periodical sending of SSE ping messages to keep session up even
44 * if the notifications doesn't flow from server to clients or clients don't implement ping-pong
46 * @param listener YANG notification or data-change event listener to which client on this SSE session subscribes
48 * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters). If this
49 * parameter is set to 0, the maximum fragment length is disabled and messages up to 64 KB can be sent
50 * (exceeded notification length ends in error). If the parameter is set to non-zero positive value,
51 * messages longer than this parameter are fragmented into multiple SSE messages sent in one
53 * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep
54 * session up. Ping control frames are disabled if this parameter is set to 0.
56 public SSESessionHandler(final ScheduledExecutorService executorService, final EventOutput output,
57 final BaseListenerInterface listener, final int maximumFragmentLength, final int heartbeatInterval) {
58 this.executorService = executorService;
60 this.listener = listener;
61 this.maximumFragmentLength = maximumFragmentLength;
62 this.heartbeatInterval = heartbeatInterval;
66 * Initialization of SSE connection. SSE session handler is registered at data-change-event / YANG notification
67 * listener and the heartbeat ping process is started if it is enabled.
69 public synchronized void init() {
70 listener.addSubscriber(this);
71 if (heartbeatInterval != 0) {
72 pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
73 heartbeatInterval, TimeUnit.MILLISECONDS);
78 * Handling of SSE session close event. Removal of subscription at listener and stopping of the ping process.
80 public synchronized void close() {
81 listener.removeSubscriber(this);
86 * Sending of string message to outbound Server-Sent Events channel
87 * {@link org.glassfish.jersey.media.sse.EventOutput}. SSE is automatically split to fragments with new line
88 * character. If the maximum fragment length is set to non-zero positive value and input message exceeds this
89 * value, message is manually fragmented to multiple message fragments which are send individually. Previous
90 * fragmentation is removed.
92 * @param message Message data to be send over web-socket session.
95 public synchronized void sendDataMessage(final String message) {
96 if (Strings.isNullOrEmpty(message)) {
97 // FIXME: should this be tolerated?
100 if (output.isClosed()) {
104 if (maximumFragmentLength != 0 && message.length() > maximumFragmentLength) {
105 sendMessage(splitMessageToFragments(message));
107 sendMessage(message);
111 private void sendMessage(final String message) {
113 output.write(new OutboundEvent.Builder().data(String.class, message).build());
114 } catch (IOException e) {
115 LOG.warn("Connection from client {} is closed", this);
116 LOG.debug("Connection from client is closed:", e);
120 private void sendComment(final String message) {
122 output.write(new OutboundEvent.Builder().comment(message).build());
123 } catch (IOException e) {
124 LOG.warn("Connection from client {} is closed", this);
125 LOG.debug("Connection from client is closed:", e);
130 * Split message to fragments. SSE automatically fragment string with new line character.
131 * For manual fragmentation we will remove all new line characters
133 * @param message Message data to be split.
134 * @return splitted message
136 private String splitMessageToFragments(final String message) {
137 StringBuilder outputMessage = new StringBuilder();
138 String inputmessage = NEWLINE.matcher(message).replaceAll("");
139 int length = inputmessage.length();
140 for (int i = 0; i < length; i += maximumFragmentLength) {
141 outputMessage.append(inputmessage.substring(i, Math.min(length, i + maximumFragmentLength))).append("\r\n");
143 return outputMessage.toString();
146 private synchronized void sendPingMessage() {
147 if (output.isClosed()) {
151 LOG.debug("sending PING:{}", PING_PAYLOAD);
152 sendComment(PING_PAYLOAD);
155 private void stopPingProcess() {
156 if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
157 pingProcess.cancel(true);
162 public synchronized boolean isConnected() {
163 return !output.isClosed();
166 // TODO:return some type of identification of connection
168 public String toString() {
169 return output.toString();