Switch to using SseEventSink
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / sse / SSESessionHandler.java
1 /*
2  * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams.sse;
9
10 import com.google.common.base.CharMatcher;
11 import com.google.common.base.Strings;
12 import java.util.concurrent.ScheduledExecutorService;
13 import java.util.concurrent.ScheduledFuture;
14 import java.util.concurrent.TimeUnit;
15 import javax.ws.rs.sse.Sse;
16 import javax.ws.rs.sse.SseEventSink;
17 import org.opendaylight.restconf.nb.rfc8040.streams.SessionHandlerInterface;
18 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 /**
23  * SSE session handler that is responsible for controlling of session, managing subscription to data-change-event or
24  * notification listener, and sending of data over established SSE session.
25  */
26 public class SSESessionHandler implements SessionHandlerInterface {
27     private static final Logger LOG = LoggerFactory.getLogger(SSESessionHandler.class);
28     private static final String PING_PAYLOAD = "ping";
29
30     private static final CharMatcher CR_OR_LF = CharMatcher.anyOf("\r\n");
31
32     private final ScheduledExecutorService executorService;
33     private final BaseListenerInterface listener;
34     private final int maximumFragmentLength;
35     private final int heartbeatInterval;
36     private final SseEventSink sink;
37     private final Sse sse;
38
39     private ScheduledFuture<?> pingProcess;
40
41     /**
42      * Creation of the new server-sent events session handler.
43      *
44      * @param executorService Executor that is used for periodical sending of SSE ping messages to keep session up even
45      *            if the notifications doesn't flow from server to clients or clients don't implement ping-pong
46      *            service.
47      * @param listener YANG notification or data-change event listener to which client on this SSE session subscribes
48      *            to.
49      * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters). If this
50      *            parameter is set to 0, the maximum fragment length is disabled and messages up to 64 KB can be sent
51      *            (exceeded notification length ends in error). If the parameter is set to non-zero positive value,
52      *            messages longer than this parameter are fragmented into multiple SSE messages sent in one
53      *            transaction.
54      * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep
55      *            session up. Ping control frames are disabled if this parameter is set to 0.
56      */
57     public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
58             final BaseListenerInterface listener, final int maximumFragmentLength, final int heartbeatInterval) {
59         this.executorService = executorService;
60         this.sse = sse;
61         this.sink = sink;
62         this.listener = listener;
63         this.maximumFragmentLength = maximumFragmentLength;
64         this.heartbeatInterval = heartbeatInterval;
65     }
66
67     /**
68      * Initialization of SSE connection. SSE session handler is registered at data-change-event / YANG notification
69      * listener and the heartbeat ping process is started if it is enabled.
70      */
71     public synchronized void init() {
72         listener.addSubscriber(this);
73         if (heartbeatInterval != 0) {
74             pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
75                     heartbeatInterval, TimeUnit.MILLISECONDS);
76         }
77     }
78
79     /**
80      * Handling of SSE session close event. Removal of subscription at listener and stopping of the ping process.
81      */
82     public synchronized void close() {
83         listener.removeSubscriber(this);
84         stopPingProcess();
85     }
86
87     /**
88      * Sending of string message to outbound Server-Sent Events channel {@link SseEventSink}. SSE is automatically split
89      * to fragments with new line character. If the maximum fragment length is set to non-zero positive value and input
90      * message exceeds this value, message is manually fragmented to multiple message fragments which are send
91      * individually. Previous fragmentation is removed.
92      *
93      * @param message Message data to be send over web-socket session.
94      */
95     @Override
96     public synchronized void sendDataMessage(final String message) {
97         if (Strings.isNullOrEmpty(message)) {
98             // FIXME: should this be tolerated?
99             return;
100         }
101         if (!sink.isClosed()) {
102             final String toSend = maximumFragmentLength != 0 && message.length() > maximumFragmentLength
103                 ? splitMessageToFragments(message) : message;
104             sink.send(sse.newEvent(toSend));
105         } else {
106             close();
107         }
108     }
109
110     /**
111      * Split message to fragments. SSE automatically fragment string with new line character.
112      * For manual fragmentation we will remove all new line characters
113      *
114      * @param message Message data to be split.
115      * @return splitted message
116      */
117     private String splitMessageToFragments(final String message) {
118         StringBuilder outputMessage = new StringBuilder();
119         String inputmessage = CR_OR_LF.removeFrom(message);
120         int length = inputmessage.length();
121         for (int i = 0; i < length; i += maximumFragmentLength) {
122             outputMessage.append(inputmessage, i, Math.min(length, i + maximumFragmentLength)).append("\r\n");
123         }
124         return outputMessage.toString();
125     }
126
127     private synchronized void sendPingMessage() {
128         if (!sink.isClosed()) {
129             LOG.debug("sending PING:{}", PING_PAYLOAD);
130             sink.send(sse.newEventBuilder().comment(PING_PAYLOAD).build());
131         } else {
132             close();
133         }
134     }
135
136     private void stopPingProcess() {
137         if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
138             pingProcess.cancel(true);
139         }
140     }
141
142     @Override
143     public synchronized boolean isConnected() {
144         return !sink.isClosed();
145     }
146
147     // TODO:return some type of identification of connection
148     @Override
149     public String toString() {
150         return sink.toString();
151     }
152 }