import static org.opendaylight.netconf.transport.http.Http2Utils.copyStreamId;
import static org.opendaylight.netconf.transport.http.SseUtils.chunksOf;
-import com.google.common.util.concurrent.FutureCallback;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
*
* <p>
* Intercepts GET requests with {@code accept=text/event-stream} header, invokes the
- * {@link EventStreamService#startEventStream(String, EventStreamListener, FutureCallback)} using
+ * {@link EventStreamService#startEventStream(String, EventStreamListener, StartCallback)} using
* request {@code URI} as parameter. If request is accepted the handler starts an event stream as
* {@code transfer-encoding=chunked} response: headers are sent immediately, body chunks are sent on
* service events.
*/
final class ServerSseHandler extends ChannelInboundHandlerAdapter implements EventStreamListener {
private static final Logger LOG = LoggerFactory.getLogger(ServerSseHandler.class);
- private static final ByteBuf PING_MESSAGE = Unpooled.wrappedBuffer(":ping\r\n".getBytes(StandardCharsets.UTF_8));
-
+ private static final ByteBuf PING_MESSAGE =
+ Unpooled.wrappedBuffer(":ping\r\n\r\n".getBytes(StandardCharsets.UTF_8));
+ private static final ByteBuf EMPTY_LINE = Unpooled.wrappedBuffer("\r\n".getBytes(StandardCharsets.UTF_8));
private final int maxFieldValueLength;
private final int heartbeatIntervalMillis;
private final EventStreamService service;
public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
context = ctx;
ctx.channel().closeFuture().addListener(ignored -> unregister());
- LOG.info("Server SSE enabled on channel {}", context.channel());
+ LOG.debug("Server SSE enabled on channel {}", context.channel());
}
@Override
if (isChannelWritable()) {
chunksOf(fieldName, fieldValue, maxFieldValueLength, context.alloc())
.forEach(chunk -> context.writeAndFlush(new DefaultHttpContent(chunk)));
+ context.writeAndFlush(new DefaultHttpContent(EMPTY_LINE.copy()));
}
}
private void sendPing() {
if (isChannelWritable() && streaming) {
- context.writeAndFlush(PING_MESSAGE.copy());
+ context.writeAndFlush(new DefaultHttpContent(PING_MESSAGE.copy()));
schedulePing();
}
}