2 * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import com.google.common.base.CharMatcher;
11 import com.google.common.base.Strings;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.ScheduledFuture;
14 import java.util.concurrent.TimeUnit;
15 import javax.ws.rs.sse.Sse;
16 import javax.ws.rs.sse.SseEventSink;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
21 * SSE session handler that is responsible for controlling of session, managing subscription to data-change-event or
22 * notification listener, and sending of data over established SSE session.
24 public final class SSESessionHandler implements StreamSessionHandler {
25 private static final Logger LOG = LoggerFactory.getLogger(SSESessionHandler.class);
26 private static final String PING_PAYLOAD = "ping";
28 private static final CharMatcher CR_OR_LF = CharMatcher.anyOf("\r\n");
30 private final ScheduledExecutorService executorService;
31 private final BaseListenerInterface listener;
32 private final int maximumFragmentLength;
33 private final int heartbeatInterval;
34 private final SseEventSink sink;
35 private final Sse sse;
37 private ScheduledFuture<?> pingProcess;
40 * Creation of the new server-sent events session handler.
42 * @param executorService Executor that is used for periodical sending of SSE ping messages to keep session up even
43 * if the notifications doesn't flow from server to clients or clients don't implement ping-pong
45 * @param listener YANG notification or data-change event listener to which client on this SSE session subscribes
47 * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters). If this
48 * parameter is set to 0, the maximum fragment length is disabled and messages up to 64 KB can be sent
49 * (exceeded notification length ends in error). If the parameter is set to non-zero positive value,
50 * messages longer than this parameter are fragmented into multiple SSE messages sent in one
52 * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep
53 * session up. Ping control frames are disabled if this parameter is set to 0.
55 public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
56 final BaseListenerInterface listener, final int maximumFragmentLength, final int heartbeatInterval) {
57 this.executorService = executorService;
60 this.listener = listener;
61 this.maximumFragmentLength = maximumFragmentLength;
62 this.heartbeatInterval = heartbeatInterval;
66 * Initialization of SSE connection. SSE session handler is registered at data-change-event / YANG notification
67 * listener and the heartbeat ping process is started if it is enabled.
69 public synchronized void init() {
70 listener.addSubscriber(this);
71 if (heartbeatInterval != 0) {
72 pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
73 heartbeatInterval, TimeUnit.MILLISECONDS);
78 * Handling of SSE session close event. Removal of subscription at listener and stopping of the ping process.
80 public synchronized void close() {
81 listener.removeSubscriber(this);
86 * Sending of string message to outbound Server-Sent Events channel {@link SseEventSink}. SSE is automatically split
87 * to fragments with new line character. If the maximum fragment length is set to non-zero positive value and input
88 * message exceeds this value, message is manually fragmented to multiple message fragments which are send
89 * individually. Previous fragmentation is removed.
91 * @param message Message data to be send over web-socket session.
94 public synchronized void sendDataMessage(final String message) {
95 if (Strings.isNullOrEmpty(message)) {
96 // FIXME: should this be tolerated?
99 if (!sink.isClosed()) {
100 final String toSend = maximumFragmentLength != 0 && message.length() > maximumFragmentLength
101 ? splitMessageToFragments(message) : message;
102 sink.send(sse.newEvent(toSend));
109 * Split message to fragments. SSE automatically fragment string with new line character.
110 * For manual fragmentation we will remove all new line characters
112 * @param message Message data to be split.
113 * @return splitted message
115 private String splitMessageToFragments(final String message) {
116 StringBuilder outputMessage = new StringBuilder();
117 String inputmessage = CR_OR_LF.removeFrom(message);
118 int length = inputmessage.length();
119 for (int i = 0; i < length; i += maximumFragmentLength) {
120 outputMessage.append(inputmessage, i, Math.min(length, i + maximumFragmentLength)).append("\r\n");
122 return outputMessage.toString();
125 private synchronized void sendPingMessage() {
126 if (!sink.isClosed()) {
127 LOG.debug("sending PING:{}", PING_PAYLOAD);
128 sink.send(sse.newEventBuilder().comment(PING_PAYLOAD).build());
134 private void stopPingProcess() {
135 if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
136 pingProcess.cancel(true);
141 public synchronized boolean isConnected() {
142 return !sink.isClosed();
145 // TODO:return some type of identification of connection
147 public String toString() {
148 return sink.toString();