Merge "Fixed for bug : 1171 - issue while creating subnet"
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / streams / websockets / WebSocketServerHandler.java
1 package org.opendaylight.controller.sal.streams.websockets;
2
3 import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
4 import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
5 import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
6 import static io.netty.handler.codec.http.HttpMethod.GET;
7 import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
8 import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
9 import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
10 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
11
12 import io.netty.buffer.ByteBuf;
13 import io.netty.buffer.Unpooled;
14 import io.netty.channel.ChannelFuture;
15 import io.netty.channel.ChannelFutureListener;
16 import io.netty.channel.ChannelHandlerContext;
17 import io.netty.channel.SimpleChannelInboundHandler;
18 import io.netty.handler.codec.http.DefaultFullHttpResponse;
19 import io.netty.handler.codec.http.FullHttpRequest;
20 import io.netty.handler.codec.http.FullHttpResponse;
21 import io.netty.handler.codec.http.HttpRequest;
22 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
23 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
24 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
25 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
26 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
27 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
28 import io.netty.util.CharsetUtil;
29 import java.io.IOException;
30 import org.opendaylight.controller.sal.streams.listeners.ListenerAdapter;
31 import org.opendaylight.controller.sal.streams.listeners.Notificator;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * {@link WebSocketServerHandler} is implementation of {@link SimpleChannelInboundHandler} which allow handle
37  * {@link FullHttpRequest} and {@link WebSocketFrame} messages.
38  */
39 public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
40
41     private static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandler.class);
42
43     private WebSocketServerHandshaker handshaker;
44
45     @Override
46     protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
47         if (msg instanceof FullHttpRequest) {
48             handleHttpRequest(ctx, (FullHttpRequest) msg);
49         } else if (msg instanceof WebSocketFrame) {
50             handleWebSocketFrame(ctx, (WebSocketFrame) msg);
51         }
52     }
53
54     /**
55      * Checks if HTTP request method is GET and if is possible to decode HTTP result of request.
56      *
57      * @param ctx
58      *            ChannelHandlerContext
59      * @param req
60      *            FullHttpRequest
61      */
62     private void handleHttpRequest(final ChannelHandlerContext ctx, final FullHttpRequest req) throws Exception {
63         // Handle a bad request.
64         if (!req.getDecoderResult().isSuccess()) {
65             sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
66             return;
67         }
68
69         // Allow only GET methods.
70         if (req.getMethod() != GET) {
71             sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
72             return;
73         }
74
75         String streamName = Notificator.createStreamNameFromUri(req.getUri());
76         ListenerAdapter listener = Notificator.getListenerFor(streamName);
77         if (listener != null) {
78             listener.addSubscriber(ctx.channel());
79             logger.debug("Subscriber successfully registered.");
80         } else {
81             logger.error("Listener for stream with name '{}' was not found.", streamName);
82             sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
83         }
84
85         // Handshake
86         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
87                 null, false);
88         handshaker = wsFactory.newHandshaker(req);
89         if (handshaker == null) {
90             WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
91         } else {
92             handshaker.handshake(ctx.channel(), req);
93         }
94
95     }
96
97     /**
98      * Checks response status, send response and close connection if necessary
99      *
100      * @param ctx
101      *            ChannelHandlerContext
102      * @param req
103      *            HttpRequest
104      * @param res
105      *            FullHttpResponse
106      */
107     private static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpRequest req,
108             final FullHttpResponse res) {
109         // Generate an error page if response getStatus code is not OK (200).
110         if (res.getStatus().code() != 200) {
111             ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
112             res.content().writeBytes(buf);
113             buf.release();
114             setContentLength(res, res.content().readableBytes());
115         }
116
117         // Send the response and close the connection if necessary.
118         ChannelFuture f = ctx.channel().writeAndFlush(res);
119         if (!isKeepAlive(req) || res.getStatus().code() != 200) {
120             f.addListener(ChannelFutureListener.CLOSE);
121         }
122     }
123
124     /**
125      * Handles web socket frame.
126      *
127      * @param ctx
128      *            {@link ChannelHandlerContext}
129      * @param frame
130      *            {@link WebSocketFrame}
131      */
132     private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) throws IOException {
133         if (frame instanceof CloseWebSocketFrame) {
134             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
135             String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
136             ListenerAdapter listener = Notificator.getListenerFor(streamName);
137             if (listener != null) {
138                 listener.removeSubscriber(ctx.channel());
139                 logger.debug("Subscriber successfully registered.");
140             }
141             Notificator.removeListenerIfNoSubscriberExists(listener);
142             return;
143         } else if (frame instanceof PingWebSocketFrame) {
144             ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
145             return;
146         }
147     }
148
149     @Override
150     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
151         if (cause instanceof java.nio.channels.ClosedChannelException == false) {
152             // cause.printStackTrace();
153         }
154         ctx.close();
155     }
156
157     /**
158      * Get web socket location from HTTP request.
159      *
160      * @param req
161      *            HTTP request from which the location will be returned
162      * @return String representation of web socket location.
163      */
164     private static String getWebSocketLocation(final HttpRequest req) {
165         return "http://" + req.headers().get(HOST) + req.getUri();
166     }
167
168 }