Introduce restconf.server.{api,spi,mdsal}
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / WebSocketSender.java
similarity index 89%
rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandler.java
rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSender.java
index 0352834ddb3005bd8718e0e2df1055b7364db0f3..ff7075bb01a34f38bd85abda991ab742d94a747c 100644 (file)
@@ -17,8 +17,6 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.xml.xpath.XPathExpressionException;
@@ -31,7 +29,9 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
 import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
 import org.eclipse.jetty.websocket.api.annotations.WebSocket;
 import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
-import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
+import org.opendaylight.restconf.server.spi.RestconfStream;
+import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
+import org.opendaylight.restconf.server.spi.RestconfStream.Sender;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,25 +41,25 @@ import org.slf4j.LoggerFactory;
  * to data-change-event or notification listener, and sending of data over established web-socket session.
  */
 @WebSocket
-public final class WebSocketSessionHandler implements StreamSessionHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(WebSocketSessionHandler.class);
+final class WebSocketSender implements Sender {
+    private static final Logger LOG = LoggerFactory.getLogger(WebSocketSender.class);
     private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset());
 
-    private final ScheduledExecutorService executorService;
+    private final PingExecutor pingExecutor;
     private final RestconfStream<?> stream;
     private final EncodingName encodingName;
     private final ReceiveEventsParams params;
     private final int maximumFragmentLength;
-    private final int heartbeatInterval;
+    private final long heartbeatInterval;
 
     private Session session;
     private Registration subscriber;
-    private ScheduledFuture<?> pingProcess;
+    private Registration pingProcess;
 
     /**
      * Creation of the new web-socket session handler.
      *
-     * @param executorService       Executor that is used for periodical sending of web-socket ping messages to keep
+     * @param pingExecutor          Executor that is used for periodical sending of web-socket ping messages to keep
      *                              session up even if the notifications doesn't flow from server to clients or clients
      *                              don't implement ping-pong service.
      * @param stream                YANG notification or data-change event listener to which client on this web-socket
@@ -72,10 +72,9 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
      * @param heartbeatInterval     Interval in milliseconds of sending of ping control frames to remote endpoint
      *                              to keep session up. Ping control frames are disabled if this parameter is set to 0.
      */
-    WebSocketSessionHandler(final ScheduledExecutorService executorService, final RestconfStream<?> stream,
-            final EncodingName encodingName, final @Nullable ReceiveEventsParams params,
-            final int maximumFragmentLength, final int heartbeatInterval) {
-        this.executorService = requireNonNull(executorService);
+    WebSocketSender(final PingExecutor pingExecutor, final RestconfStream<?> stream, final EncodingName encodingName,
+            final @Nullable ReceiveEventsParams params, final int maximumFragmentLength, final long heartbeatInterval) {
+        this.pingExecutor = requireNonNull(pingExecutor);
         this.stream = requireNonNull(stream);
         this.encodingName = requireNonNull(encodingName);
         // FIXME: NETCONF-1102: require params
@@ -109,8 +108,7 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
             if (heartbeatInterval != 0) {
                 // sending of PING frame can be long if there is an error on web-socket - from this reason
                 // the fixed-rate should not be used
-                pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
-                        heartbeatInterval, TimeUnit.MILLISECONDS);
+                pingProcess = pingExecutor.startPingProcess(this::sendPing, heartbeatInterval, TimeUnit.MILLISECONDS);
             }
         }
     }
@@ -167,8 +165,9 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
     }
 
     private void stopPingProcess() {
-        if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
-            pingProcess.cancel(true);
+        if (pingProcess != null) {
+            pingProcess.close();
+            pingProcess = null;
         }
     }
 
@@ -233,7 +232,7 @@ public final class WebSocketSessionHandler implements StreamSessionHandler {
         }
     }
 
-    private synchronized void sendPingMessage() {
+    private synchronized void sendPing() {
         try {
             Objects.requireNonNull(session).getRemote().sendPing(ByteBuffer.wrap(PING_PAYLOAD));
         } catch (IOException e) {