1 package org.opendaylight.controller.sal.streams.websockets;
3 import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
4 import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
5 import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
6 import static io.netty.handler.codec.http.HttpMethod.GET;
7 import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
8 import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
9 import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
10 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
12 import io.netty.buffer.ByteBuf;
13 import io.netty.buffer.Unpooled;
14 import io.netty.channel.ChannelFuture;
15 import io.netty.channel.ChannelFutureListener;
16 import io.netty.channel.ChannelHandlerContext;
17 import io.netty.channel.SimpleChannelInboundHandler;
18 import io.netty.handler.codec.http.DefaultFullHttpResponse;
19 import io.netty.handler.codec.http.FullHttpRequest;
20 import io.netty.handler.codec.http.FullHttpResponse;
21 import io.netty.handler.codec.http.HttpRequest;
22 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
23 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
24 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
25 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
26 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
27 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
28 import io.netty.util.CharsetUtil;
29 import java.io.IOException;
30 import org.opendaylight.controller.sal.streams.listeners.ListenerAdapter;
31 import org.opendaylight.controller.sal.streams.listeners.Notificator;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * {@link WebSocketServerHandler} is implementation of {@link SimpleChannelInboundHandler} which allow handle
37 * {@link FullHttpRequest} and {@link WebSocketFrame} messages.
39 public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
41 private static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandler.class);
43 private WebSocketServerHandshaker handshaker;
46 protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
47 if (msg instanceof FullHttpRequest) {
48 handleHttpRequest(ctx, (FullHttpRequest) msg);
49 } else if (msg instanceof WebSocketFrame) {
50 handleWebSocketFrame(ctx, (WebSocketFrame) msg);
55 * Checks if HTTP request method is GET and if is possible to decode HTTP result of request.
58 * ChannelHandlerContext
62 private void handleHttpRequest(final ChannelHandlerContext ctx, final FullHttpRequest req) throws Exception {
63 // Handle a bad request.
64 if (!req.getDecoderResult().isSuccess()) {
65 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
69 // Allow only GET methods.
70 if (req.getMethod() != GET) {
71 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
75 String streamName = Notificator.createStreamNameFromUri(req.getUri());
76 ListenerAdapter listener = Notificator.getListenerFor(streamName);
77 if (listener != null) {
78 listener.addSubscriber(ctx.channel());
79 logger.debug("Subscriber successfully registered.");
81 logger.error("Listener for stream with name '{}' was not found.", streamName);
82 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
86 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
88 handshaker = wsFactory.newHandshaker(req);
89 if (handshaker == null) {
90 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
92 handshaker.handshake(ctx.channel(), req);
98 * Checks response status, send response and close connection if necessary
101 * ChannelHandlerContext
107 private static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpRequest req,
108 final FullHttpResponse res) {
109 // Generate an error page if response getStatus code is not OK (200).
110 if (res.getStatus().code() != 200) {
111 ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
112 res.content().writeBytes(buf);
114 setContentLength(res, res.content().readableBytes());
117 // Send the response and close the connection if necessary.
118 ChannelFuture f = ctx.channel().writeAndFlush(res);
119 if (!isKeepAlive(req) || res.getStatus().code() != 200) {
120 f.addListener(ChannelFutureListener.CLOSE);
125 * Handles web socket frame.
128 * {@link ChannelHandlerContext}
130 * {@link WebSocketFrame}
132 private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) throws IOException {
133 if (frame instanceof CloseWebSocketFrame) {
134 handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
135 String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
136 ListenerAdapter listener = Notificator.getListenerFor(streamName);
137 if (listener != null) {
138 listener.removeSubscriber(ctx.channel());
139 logger.debug("Subscriber successfully registered.");
141 Notificator.removeListenerIfNoSubscriberExists(listener);
143 } else if (frame instanceof PingWebSocketFrame) {
144 ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
150 public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
151 if (cause instanceof java.nio.channels.ClosedChannelException == false) {
152 // cause.printStackTrace();
158 * Get web socket location from HTTP request.
161 * HTTP request from which the location will be returned
162 * @return String representation of web socket location.
164 private static String getWebSocketLocation(final HttpRequest req) {
165 return "http://" + req.headers().get(HOST) + req.getUri();