da1ee72e4c4cb478b438a2fa6763ca7dc428df71
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / sse / SSESessionHandler.java
1 /*
2  * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams.sse;
9
10 import com.google.common.base.CharMatcher;
11 import com.google.common.base.Strings;
12 import java.io.IOException;
13 import java.util.concurrent.ScheduledExecutorService;
14 import java.util.concurrent.ScheduledFuture;
15 import java.util.concurrent.TimeUnit;
16 import org.glassfish.jersey.media.sse.EventOutput;
17 import org.glassfish.jersey.media.sse.OutboundEvent;
18 import org.opendaylight.restconf.nb.rfc8040.streams.SessionHandlerInterface;
19 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 /**
24  * SSE session handler that is responsible for controlling of session, managing subscription to data-change-event or
25  * notification listener, and sending of data over established SSE session.
26  */
27 public class SSESessionHandler implements SessionHandlerInterface {
28     private static final Logger LOG = LoggerFactory.getLogger(SSESessionHandler.class);
29     private static final String PING_PAYLOAD = "ping";
30
31     private static final CharMatcher CR_OR_LF = CharMatcher.anyOf("\r\n");
32
33     private final ScheduledExecutorService executorService;
34     private final BaseListenerInterface listener;
35     private final int maximumFragmentLength;
36     private final int heartbeatInterval;
37     private final EventOutput output;
38     private ScheduledFuture<?> pingProcess;
39
40     /**
41      * Creation of the new server-sent events session handler.
42      *
43      * @param executorService Executor that is used for periodical sending of SSE ping messages to keep session up even
44      *            if the notifications doesn't flow from server to clients or clients don't implement ping-pong
45      *            service.
46      * @param listener YANG notification or data-change event listener to which client on this SSE session subscribes
47      *            to.
48      * @param maximumFragmentLength Maximum fragment length in number of Unicode code units (characters). If this
49      *            parameter is set to 0, the maximum fragment length is disabled and messages up to 64 KB can be sent
50      *            (exceeded notification length ends in error). If the parameter is set to non-zero positive value,
51      *            messages longer than this parameter are fragmented into multiple SSE messages sent in one
52      *            transaction.
53      * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep
54      *            session up. Ping control frames are disabled if this parameter is set to 0.
55      */
56     public SSESessionHandler(final ScheduledExecutorService executorService, final EventOutput output,
57             final BaseListenerInterface listener, final int maximumFragmentLength, final int heartbeatInterval) {
58         this.executorService = executorService;
59         this.output = output;
60         this.listener = listener;
61         this.maximumFragmentLength = maximumFragmentLength;
62         this.heartbeatInterval = heartbeatInterval;
63     }
64
65     /**
66      * Initialization of SSE connection. SSE session handler is registered at data-change-event / YANG notification
67      * listener and the heartbeat ping process is started if it is enabled.
68      */
69     public synchronized void init() {
70         listener.addSubscriber(this);
71         if (heartbeatInterval != 0) {
72             pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
73                     heartbeatInterval, TimeUnit.MILLISECONDS);
74         }
75     }
76
77     /**
78      * Handling of SSE session close event. Removal of subscription at listener and stopping of the ping process.
79      */
80     public synchronized void close() {
81         listener.removeSubscriber(this);
82         stopPingProcess();
83     }
84
85     /**
86      * Sending of string message to outbound Server-Sent Events channel
87      * {@link org.glassfish.jersey.media.sse.EventOutput}. SSE is automatically split to fragments with new line
88      * character. If the maximum fragment length is set to non-zero positive value and input message exceeds this
89      * value, message is manually fragmented to multiple message fragments which are send individually. Previous
90      * fragmentation is removed.
91      *
92      * @param message Message data to be send over web-socket session.
93      */
94     @Override
95     public synchronized void sendDataMessage(final String message) {
96         if (Strings.isNullOrEmpty(message)) {
97             // FIXME: should this be tolerated?
98             return;
99         }
100         if (output.isClosed()) {
101             close();
102             return;
103         }
104         if (maximumFragmentLength != 0 && message.length() > maximumFragmentLength) {
105             sendMessage(splitMessageToFragments(message));
106         } else {
107             sendMessage(message);
108         }
109     }
110
111     private void sendMessage(final String message) {
112         try {
113             output.write(new OutboundEvent.Builder().data(String.class, message).build());
114         } catch (IOException e) {
115             LOG.warn("Connection from client {} is closed", this);
116             LOG.debug("Connection from client is closed:", e);
117         }
118     }
119
120     private void sendComment(final String message) {
121         try {
122             output.write(new OutboundEvent.Builder().comment(message).build());
123         } catch (IOException e) {
124             LOG.warn("Connection from client {} is closed", this);
125             LOG.debug("Connection from client is closed:", e);
126         }
127     }
128
129     /**
130      * Split message to fragments. SSE automatically fragment string with new line character.
131      * For manual fragmentation we will remove all new line characters
132      *
133      * @param message Message data to be split.
134      * @return splitted message
135      */
136     private String splitMessageToFragments(final String message) {
137         StringBuilder outputMessage = new StringBuilder();
138         String inputmessage = CR_OR_LF.removeFrom(message);
139         int length = inputmessage.length();
140         for (int i = 0; i < length; i += maximumFragmentLength) {
141             outputMessage.append(inputmessage, i, Math.min(length, i + maximumFragmentLength)).append("\r\n");
142         }
143         return outputMessage.toString();
144     }
145
146     private synchronized void sendPingMessage() {
147         if (output.isClosed()) {
148             close();
149             return;
150         }
151         LOG.debug("sending PING:{}", PING_PAYLOAD);
152         sendComment(PING_PAYLOAD);
153     }
154
155     private void stopPingProcess() {
156         if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
157             pingProcess.cancel(true);
158         }
159     }
160
161     @Override
162     public synchronized boolean isConnected() {
163         return !output.isClosed();
164     }
165
166     // TODO:return some type of identification of connection
167     @Override
168     public String toString() {
169         return output.toString();
170     }
171 }