X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-rest-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fstreams%2Fwebsockets%2FWebSocketServerHandler.java;h=ce12d34e083f2ca440c754ab736aed1834276e0f;hp=1918503c58eb00425a743340b2c65f8e46488d75;hb=17d82f582a6bc13c78be3b19954ff8c021180e93;hpb=1ee71ae58a03de1c1f8fd8c789e7921508ba9f59;ds=sidebyside diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java index 1918503c58..ce12d34e08 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java @@ -8,6 +8,7 @@ import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; @@ -25,162 +26,143 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil; - import java.io.IOException; - import org.opendaylight.controller.sal.streams.listeners.ListenerAdapter; import org.opendaylight.controller.sal.streams.listeners.Notificator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * {@link WebSocketServerHandler} is implementation of - * {@link SimpleChannelInboundHandler} which allow handle + * {@link WebSocketServerHandler} is implementation of {@link SimpleChannelInboundHandler} which allow handle * {@link FullHttpRequest} and {@link WebSocketFrame} messages. */ public class WebSocketServerHandler extends SimpleChannelInboundHandler { - private static final Logger logger = LoggerFactory - .getLogger(WebSocketServerHandler.class); - - private WebSocketServerHandshaker handshaker; - - @Override - protected void channelRead0(ChannelHandlerContext ctx, Object msg) - throws Exception { - if (msg instanceof FullHttpRequest) { - handleHttpRequest(ctx, (FullHttpRequest) msg); - } else if (msg instanceof WebSocketFrame) { - handleWebSocketFrame(ctx, (WebSocketFrame) msg); - } - } - - /** - * Checks if HTTP request method is GET and if is possible to decode HTTP - * result of request. - * - * @param ctx - * ChannelHandlerContext - * @param req - * FullHttpRequest - */ - private void handleHttpRequest(ChannelHandlerContext ctx, - FullHttpRequest req) throws Exception { - // Handle a bad request. - if (!req.getDecoderResult().isSuccess()) { - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, - BAD_REQUEST)); - return; - } - - // Allow only GET methods. - if (req.getMethod() != GET) { - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, - FORBIDDEN)); - return; - } - - String streamName = Notificator.createStreamNameFromUri(req.getUri()); - ListenerAdapter listener = Notificator.getListenerFor(streamName); - if (listener != null) { - listener.addSubscriber(ctx.channel()); - logger.debug("Subscriber successfully registered."); - } else { - logger.error("Listener for stream with name '{}' was not found.", - streamName); - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, - INTERNAL_SERVER_ERROR)); - } - - // Handshake - WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( - getWebSocketLocation(req), null, false); - handshaker = wsFactory.newHandshaker(req); - if (handshaker == null) { - WebSocketServerHandshakerFactory - .sendUnsupportedWebSocketVersionResponse(ctx.channel()); - } else { - handshaker.handshake(ctx.channel(), req); - } - - } - - /** - * Checks response status, send response and close connection if necessary - * - * @param ctx - * ChannelHandlerContext - * @param req - * HttpRequest - * @param res - * FullHttpResponse - */ - private static void sendHttpResponse(ChannelHandlerContext ctx, - HttpRequest req, FullHttpResponse res) { - // Generate an error page if response getStatus code is not OK (200). - if (res.getStatus().code() != 200) { - ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), - CharsetUtil.UTF_8); - res.content().writeBytes(buf); - buf.release(); - setContentLength(res, res.content().readableBytes()); - } - - // Send the response and close the connection if necessary. - ChannelFuture f = ctx.channel().writeAndFlush(res); - if (!isKeepAlive(req) || res.getStatus().code() != 200) { - f.addListener(ChannelFutureListener.CLOSE); - } - } - - /** - * Handles web socket frame. - * - * @param ctx - * {@link ChannelHandlerContext} - * @param frame - * {@link WebSocketFrame} - */ - private void handleWebSocketFrame(ChannelHandlerContext ctx, - WebSocketFrame frame) throws IOException { - if (frame instanceof CloseWebSocketFrame) { - handshaker.close(ctx.channel(), - (CloseWebSocketFrame) frame.retain()); - String streamName = Notificator - .createStreamNameFromUri(((CloseWebSocketFrame) frame) - .reasonText()); - ListenerAdapter listener = Notificator.getListenerFor(streamName); - if (listener != null) { - listener.removeSubscriber(ctx.channel()); - logger.debug("Subscriber successfully registered."); - } - Notificator.removeListenerIfNoSubscriberExists(listener); - return; - } else if (frame instanceof PingWebSocketFrame) { - ctx.channel().write( - new PongWebSocketFrame(frame.content().retain())); - return; - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - if (cause instanceof java.nio.channels.ClosedChannelException == false) { - // cause.printStackTrace(); - } - ctx.close(); - } - - /** - * Get web socket location from HTTP request. - * - * @param req - * HTTP request from which the location will be returned - * @return String representation of web socket location. - */ - private static String getWebSocketLocation(HttpRequest req) { - return "http://" + req.headers().get(HOST) + req.getUri(); - } + private static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandler.class); + + private WebSocketServerHandshaker handshaker; + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception { + if (msg instanceof FullHttpRequest) { + handleHttpRequest(ctx, (FullHttpRequest) msg); + } else if (msg instanceof WebSocketFrame) { + handleWebSocketFrame(ctx, (WebSocketFrame) msg); + } + } + + /** + * Checks if HTTP request method is GET and if is possible to decode HTTP result of request. + * + * @param ctx + * ChannelHandlerContext + * @param req + * FullHttpRequest + */ + private void handleHttpRequest(final ChannelHandlerContext ctx, final FullHttpRequest req) throws Exception { + // Handle a bad request. + if (!req.getDecoderResult().isSuccess()) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); + return; + } + + // Allow only GET methods. + if (req.getMethod() != GET) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN)); + return; + } + + String streamName = Notificator.createStreamNameFromUri(req.getUri()); + ListenerAdapter listener = Notificator.getListenerFor(streamName); + if (listener != null) { + listener.addSubscriber(ctx.channel()); + logger.debug("Subscriber successfully registered."); + } else { + logger.error("Listener for stream with name '{}' was not found.", streamName); + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR)); + } + + // Handshake + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), + null, false); + handshaker = wsFactory.newHandshaker(req); + if (handshaker == null) { + WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); + } else { + handshaker.handshake(ctx.channel(), req); + } + + } + + /** + * Checks response status, send response and close connection if necessary + * + * @param ctx + * ChannelHandlerContext + * @param req + * HttpRequest + * @param res + * FullHttpResponse + */ + private static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpRequest req, + final FullHttpResponse res) { + // Generate an error page if response getStatus code is not OK (200). + if (res.getStatus().code() != 200) { + ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); + res.content().writeBytes(buf); + buf.release(); + setContentLength(res, res.content().readableBytes()); + } + + // Send the response and close the connection if necessary. + ChannelFuture f = ctx.channel().writeAndFlush(res); + if (!isKeepAlive(req) || res.getStatus().code() != 200) { + f.addListener(ChannelFutureListener.CLOSE); + } + } + + /** + * Handles web socket frame. + * + * @param ctx + * {@link ChannelHandlerContext} + * @param frame + * {@link WebSocketFrame} + */ + private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) throws IOException { + if (frame instanceof CloseWebSocketFrame) { + handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); + String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText()); + ListenerAdapter listener = Notificator.getListenerFor(streamName); + if (listener != null) { + listener.removeSubscriber(ctx.channel()); + logger.debug("Subscriber successfully registered."); + } + Notificator.removeListenerIfNoSubscriberExists(listener); + return; + } else if (frame instanceof PingWebSocketFrame) { + ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); + return; + } + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { + if (cause instanceof java.nio.channels.ClosedChannelException == false) { + // cause.printStackTrace(); + } + ctx.close(); + } + + /** + * Get web socket location from HTTP request. + * + * @param req + * HTTP request from which the location will be returned + * @return String representation of web socket location. + */ + private static String getWebSocketLocation(final HttpRequest req) { + return "http://" + req.headers().get(HOST) + req.getUri(); + } }