Use empty line as message separator in SSE stream 07/113107/3
authorRuslan Kashapov <ruslan.kashapov@pantheon.tech>
Tue, 13 Aug 2024 08:47:16 +0000 (11:47 +0300)
committerIvan Hrasko <ivan.hrasko@pantheon.tech>
Mon, 26 Aug 2024 11:02:05 +0000 (13:02 +0200)
The double end-line as a message separator wasn't clearly described
in a spec, so it was missed initially (NETCONF-1339).

The external clients like postman rely on this separator
and it makes it consistent with jaxrs implementation.

Also:
- fixed javadoc reference
- lowered log level for every connection message
- using proper ping message wrapper

JIRA: NETCONF-1366
Change-Id: I0cf9cac5cf707315928e0e739f3ad5776271fbc6
Signed-off-by: Ruslan Kashapov <ruslan.kashapov@pantheon.tech>
Signed-off-by: Ivan Hrasko <ivan.hrasko@pantheon.tech>
transport/transport-http/src/main/java/org/opendaylight/netconf/transport/http/ServerSseHandler.java

index 69b623cc0e0903ca56ffa99255840bed6a8bfaeb..7cb21665ec165ebb048e4dc2921b6e7a22d2c554 100644 (file)
@@ -11,7 +11,6 @@ import static java.util.Objects.requireNonNull;
 import static org.opendaylight.netconf.transport.http.Http2Utils.copyStreamId;
 import static org.opendaylight.netconf.transport.http.SseUtils.chunksOf;
 
-import com.google.common.util.concurrent.FutureCallback;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
@@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory;
  *
  * <p>
  * Intercepts GET requests with {@code accept=text/event-stream} header, invokes the
- * {@link EventStreamService#startEventStream(String, EventStreamListener, FutureCallback)} using
+ * {@link EventStreamService#startEventStream(String, EventStreamListener, StartCallback)} using
  * request {@code URI} as parameter. If request is accepted the handler starts an event stream as
  * {@code transfer-encoding=chunked} response: headers are sent immediately, body chunks are sent on
  * service events.
@@ -50,8 +49,9 @@ import org.slf4j.LoggerFactory;
  */
 final class ServerSseHandler extends ChannelInboundHandlerAdapter implements EventStreamListener {
     private static final Logger LOG = LoggerFactory.getLogger(ServerSseHandler.class);
-    private static final ByteBuf PING_MESSAGE = Unpooled.wrappedBuffer(":ping\r\n".getBytes(StandardCharsets.UTF_8));
-
+    private static final ByteBuf PING_MESSAGE =
+        Unpooled.wrappedBuffer(":ping\r\n\r\n".getBytes(StandardCharsets.UTF_8));
+    private static final ByteBuf EMPTY_LINE = Unpooled.wrappedBuffer("\r\n".getBytes(StandardCharsets.UTF_8));
     private final int maxFieldValueLength;
     private final int heartbeatIntervalMillis;
     private final EventStreamService service;
@@ -71,7 +71,7 @@ final class ServerSseHandler extends ChannelInboundHandlerAdapter implements Eve
     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
         context = ctx;
         ctx.channel().closeFuture().addListener(ignored -> unregister());
-        LOG.info("Server SSE enabled on channel {}", context.channel());
+        LOG.debug("Server SSE enabled on channel {}", context.channel());
     }
 
     @Override
@@ -154,6 +154,7 @@ final class ServerSseHandler extends ChannelInboundHandlerAdapter implements Eve
         if (isChannelWritable()) {
             chunksOf(fieldName, fieldValue, maxFieldValueLength, context.alloc())
                 .forEach(chunk -> context.writeAndFlush(new DefaultHttpContent(chunk)));
+            context.writeAndFlush(new DefaultHttpContent(EMPTY_LINE.copy()));
         }
     }
 
@@ -181,7 +182,7 @@ final class ServerSseHandler extends ChannelInboundHandlerAdapter implements Eve
 
     private void sendPing() {
         if (isChannelWritable() && streaming) {
-            context.writeAndFlush(PING_MESSAGE.copy());
+            context.writeAndFlush(new DefaultHttpContent(PING_MESSAGE.copy()));
             schedulePing();
         }
     }