import com.google.common.base.CharMatcher;
import com.google.common.base.Strings;
import java.io.UnsupportedEncodingException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.xml.xpath.XPathExpressionException;
import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
-import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
+import org.opendaylight.restconf.server.spi.RestconfStream;
+import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
+import org.opendaylight.restconf.server.spi.RestconfStream.Sender;
import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* SSE session handler that is responsible for controlling of session, managing subscription to data-change-event or
* notification listener, and sending of data over established SSE session.
*/
-public final class SSESessionHandler implements StreamSessionHandler {
- private static final Logger LOG = LoggerFactory.getLogger(SSESessionHandler.class);
+final class SSESender implements Sender {
+ private static final Logger LOG = LoggerFactory.getLogger(SSESender.class);
private static final CharMatcher CR_OR_LF = CharMatcher.anyOf("\r\n");
- private final ScheduledExecutorService executorService;
+ private final PingExecutor pingExecutor;
private final RestconfStream<?> stream;
private final EncodingName encoding;
private final ReceiveEventsParams params;
private final SseEventSink sink;
private final Sse sse;
private final int maximumFragmentLength;
- private final int heartbeatInterval;
+ private final long heartbeatMillis;
- private ScheduledFuture<?> pingProcess;
+ private Registration pingProcess;
private Registration subscriber;
/**
* Creation of the new server-sent events session handler.
*
- * @param executorService Executor that is used for periodical sending of SSE ping messages to keep session up even
+ * @param pingExecutor Executor that is used for periodical sending of SSE ping messages to keep session up even
* if the notifications doesn't flow from server to clients or clients don't implement ping-pong
* service.
* @param stream YANG notification or data-change event listener to which client on this SSE session subscribes to.
* (exceeded notification length ends in error). If the parameter is set to non-zero positive value,
* messages longer than this parameter are fragmented into multiple SSE messages sent in one
* transaction.
- * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep
+ * @param heartbeatMillis Interval in milliseconds of sending of ping control frames to remote endpoint to keep
* session up. Ping control frames are disabled if this parameter is set to 0.
*/
- public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
- final RestconfStream<?> stream, final EncodingName encoding, final ReceiveEventsParams params,
- final int maximumFragmentLength, final int heartbeatInterval) {
- this.executorService = requireNonNull(executorService);
+ SSESender(final PingExecutor pingExecutor, final SseEventSink sink, final Sse sse, final RestconfStream<?> stream,
+ final EncodingName encoding, final ReceiveEventsParams params, final int maximumFragmentLength,
+ final long heartbeatMillis) {
+ this.pingExecutor = requireNonNull(pingExecutor);
this.sse = requireNonNull(sse);
this.sink = requireNonNull(sink);
this.stream = requireNonNull(stream);
this.encoding = requireNonNull(encoding);
this.params = requireNonNull(params);
this.maximumFragmentLength = maximumFragmentLength;
- this.heartbeatInterval = heartbeatInterval;
+ this.heartbeatMillis = heartbeatMillis;
}
/**
}
subscriber = local;
- if (heartbeatInterval != 0) {
- pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
- heartbeatInterval, TimeUnit.MILLISECONDS);
+ if (heartbeatMillis != 0) {
+ pingProcess = pingExecutor.startPingProcess(this::sendPing, heartbeatMillis, TimeUnit.MILLISECONDS);
}
return true;
}
return outputMessage.toString();
}
- private synchronized void sendPingMessage() {
+ private synchronized void sendPing() {
if (!sink.isClosed()) {
LOG.debug("sending PING");
sink.send(sse.newEventBuilder().comment("ping").build());
}
private void stopPingProcess() {
- if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
- pingProcess.cancel(true);
+ if (pingProcess != null) {
+ pingProcess.close();
+ pingProcess = null;
}
}