Introduce restconf.server.{api,spi,mdsal}
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / SSESender.java
similarity index 80%
rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandler.java
rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESender.java
index b5e96939f9641d27a7d57f740e532e0efb70d94c..3aed886699fd361c107c4743fa94b819a15044b0 100644 (file)
@@ -13,14 +13,14 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Strings;
 import java.io.UnsupportedEncodingException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import javax.ws.rs.sse.Sse;
 import javax.ws.rs.sse.SseEventSink;
 import javax.xml.xpath.XPathExpressionException;
 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
-import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
+import org.opendaylight.restconf.server.spi.RestconfStream;
+import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
+import org.opendaylight.restconf.server.spi.RestconfStream.Sender;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,26 +29,26 @@ import org.slf4j.LoggerFactory;
  * SSE session handler that is responsible for controlling of session, managing subscription to data-change-event or
  * notification listener, and sending of data over established SSE session.
  */
-public final class SSESessionHandler implements StreamSessionHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(SSESessionHandler.class);
+final class SSESender implements Sender {
+    private static final Logger LOG = LoggerFactory.getLogger(SSESender.class);
     private static final CharMatcher CR_OR_LF = CharMatcher.anyOf("\r\n");
 
-    private final ScheduledExecutorService executorService;
+    private final PingExecutor pingExecutor;
     private final RestconfStream<?> stream;
     private final EncodingName encoding;
     private final ReceiveEventsParams params;
     private final SseEventSink sink;
     private final Sse sse;
     private final int maximumFragmentLength;
-    private final int heartbeatInterval;
+    private final long heartbeatMillis;
 
-    private ScheduledFuture<?> pingProcess;
+    private Registration pingProcess;
     private Registration subscriber;
 
     /**
      * Creation of the new server-sent events session handler.
      *
-     * @param executorService Executor that is used for periodical sending of SSE ping messages to keep session up even
+     * @param pingExecutor Executor that is used for periodical sending of SSE ping messages to keep session up even
      *            if the notifications doesn't flow from server to clients or clients don't implement ping-pong
      *            service.
      * @param stream YANG notification or data-change event listener to which client on this SSE session subscribes to.
@@ -57,20 +57,20 @@ public final class SSESessionHandler implements StreamSessionHandler {
      *            (exceeded notification length ends in error). If the parameter is set to non-zero positive value,
      *            messages longer than this parameter are fragmented into multiple SSE messages sent in one
      *            transaction.
-     * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep
+     * @param heartbeatMillis Interval in milliseconds of sending of ping control frames to remote endpoint to keep
      *            session up. Ping control frames are disabled if this parameter is set to 0.
      */
-    public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
-            final RestconfStream<?> stream, final EncodingName encoding, final ReceiveEventsParams params,
-            final int maximumFragmentLength, final int heartbeatInterval) {
-        this.executorService = requireNonNull(executorService);
+    SSESender(final PingExecutor pingExecutor, final SseEventSink sink, final Sse sse, final RestconfStream<?> stream,
+            final EncodingName encoding, final ReceiveEventsParams params, final int maximumFragmentLength,
+            final long heartbeatMillis) {
+        this.pingExecutor = requireNonNull(pingExecutor);
         this.sse = requireNonNull(sse);
         this.sink = requireNonNull(sink);
         this.stream = requireNonNull(stream);
         this.encoding = requireNonNull(encoding);
         this.params = requireNonNull(params);
         this.maximumFragmentLength = maximumFragmentLength;
-        this.heartbeatInterval = heartbeatInterval;
+        this.heartbeatMillis = heartbeatMillis;
     }
 
     /**
@@ -88,9 +88,8 @@ public final class SSESessionHandler implements StreamSessionHandler {
         }
 
         subscriber = local;
-        if (heartbeatInterval != 0) {
-            pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
-                heartbeatInterval, TimeUnit.MILLISECONDS);
+        if (heartbeatMillis != 0) {
+            pingProcess = pingExecutor.startPingProcess(this::sendPing, heartbeatMillis, TimeUnit.MILLISECONDS);
         }
         return true;
     }
@@ -153,7 +152,7 @@ public final class SSESessionHandler implements StreamSessionHandler {
         return outputMessage.toString();
     }
 
-    private synchronized void sendPingMessage() {
+    private synchronized void sendPing() {
         if (!sink.isClosed()) {
             LOG.debug("sending PING");
             sink.send(sse.newEventBuilder().comment("ping").build());
@@ -163,8 +162,9 @@ public final class SSESessionHandler implements StreamSessionHandler {
     }
 
     private void stopPingProcess() {
-        if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
-            pingProcess.cancel(true);
+        if (pingProcess != null) {
+            pingProcess.close();
+            pingProcess = null;
         }
     }