import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.xml.xpath.XPathExpressionException;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
-import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
+import org.opendaylight.restconf.server.spi.RestconfStream;
+import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
+import org.opendaylight.restconf.server.spi.RestconfStream.Sender;
import org.opendaylight.yangtools.concepts.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* to data-change-event or notification listener, and sending of data over established web-socket session.
*/
@WebSocket
-public final class WebSocketSessionHandler implements StreamSessionHandler {
- private static final Logger LOG = LoggerFactory.getLogger(WebSocketSessionHandler.class);
+final class WebSocketSender implements Sender {
+ private static final Logger LOG = LoggerFactory.getLogger(WebSocketSender.class);
private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset());
- private final ScheduledExecutorService executorService;
+ private final PingExecutor pingExecutor;
private final RestconfStream<?> stream;
private final EncodingName encodingName;
private final ReceiveEventsParams params;
private final int maximumFragmentLength;
- private final int heartbeatInterval;
+ private final long heartbeatInterval;
private Session session;
private Registration subscriber;
- private ScheduledFuture<?> pingProcess;
+ private Registration pingProcess;
/**
* Creation of the new web-socket session handler.
*
- * @param executorService Executor that is used for periodical sending of web-socket ping messages to keep
+ * @param pingExecutor Executor that is used for periodical sending of web-socket ping messages to keep
* session up even if the notifications doesn't flow from server to clients or clients
* don't implement ping-pong service.
* @param stream YANG notification or data-change event listener to which client on this web-socket
* @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint
* to keep session up. Ping control frames are disabled if this parameter is set to 0.
*/
- WebSocketSessionHandler(final ScheduledExecutorService executorService, final RestconfStream<?> stream,
- final EncodingName encodingName, final @Nullable ReceiveEventsParams params,
- final int maximumFragmentLength, final int heartbeatInterval) {
- this.executorService = requireNonNull(executorService);
+ WebSocketSender(final PingExecutor pingExecutor, final RestconfStream<?> stream, final EncodingName encodingName,
+ final @Nullable ReceiveEventsParams params, final int maximumFragmentLength, final long heartbeatInterval) {
+ this.pingExecutor = requireNonNull(pingExecutor);
this.stream = requireNonNull(stream);
this.encodingName = requireNonNull(encodingName);
// FIXME: NETCONF-1102: require params
if (heartbeatInterval != 0) {
// sending of PING frame can be long if there is an error on web-socket - from this reason
// the fixed-rate should not be used
- pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval,
- heartbeatInterval, TimeUnit.MILLISECONDS);
+ pingProcess = pingExecutor.startPingProcess(this::sendPing, heartbeatInterval, TimeUnit.MILLISECONDS);
}
}
}
}
private void stopPingProcess() {
- if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) {
- pingProcess.cancel(true);
+ if (pingProcess != null) {
+ pingProcess.close();
+ pingProcess = null;
}
}
}
}
- private synchronized void sendPingMessage() {
+ private synchronized void sendPing() {
try {
Objects.requireNonNull(session).getRemote().sendPing(ByteBuffer.wrap(PING_PAYLOAD));
} catch (IOException e) {