Refactoring of web-sockets in RESTCONF RFC-8040
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / websockets / WebSocketServerHandler.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.restconf.nb.rfc8040.streams.websockets;
10
11 import io.netty.channel.ChannelFuture;
12 import io.netty.channel.ChannelFutureListener;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.SimpleChannelInboundHandler;
15 import io.netty.handler.codec.http.DefaultFullHttpResponse;
16 import io.netty.handler.codec.http.FullHttpRequest;
17 import io.netty.handler.codec.http.FullHttpResponse;
18 import io.netty.handler.codec.http.HttpHeaderNames;
19 import io.netty.handler.codec.http.HttpMethod;
20 import io.netty.handler.codec.http.HttpRequest;
21 import io.netty.handler.codec.http.HttpResponseStatus;
22 import io.netty.handler.codec.http.HttpUtil;
23 import io.netty.handler.codec.http.HttpVersion;
24 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
25 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
26 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
27 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
28 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
29 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
30 import io.netty.util.CharsetUtil;
31 import java.util.Optional;
32 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
33 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
34 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
35 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * {@link WebSocketServerHandler} is implementation of {@link SimpleChannelInboundHandler} which allow handle
41  * {@link FullHttpRequest} and {@link WebSocketFrame} messages.
42  */
43 public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
44     private static final Logger LOG = LoggerFactory.getLogger(WebSocketServerHandler.class);
45
46     private WebSocketServerHandshaker handshaker;
47
48     @Override
49     protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
50         if (msg instanceof FullHttpRequest) {
51             handleHttpRequest(ctx, (FullHttpRequest) msg);
52         } else if (msg instanceof WebSocketFrame) {
53             handleWebSocketFrame(ctx, (WebSocketFrame) msg);
54         }
55     }
56
57     /**
58      * Checks if HTTP request method is GET and if is possible to decode HTTP result of request.
59      *
60      * @param ctx ChannelHandlerContext
61      * @param req FullHttpRequest
62      */
63     private void handleHttpRequest(final ChannelHandlerContext ctx, final FullHttpRequest req) {
64         // Handle a bad request.
65         if (!req.decoderResult().isSuccess()) {
66             sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
67                     HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
68             return;
69         }
70
71         // Allow only GET methods.
72         if (req.method() != HttpMethod.GET) {
73             sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
74                     HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN));
75             return;
76         }
77
78         final String streamName = ListenersBroker.createStreamNameFromUri(req.uri());
79         if (streamName.contains(RestconfConstants.DATA_SUBSCR)) {
80             final Optional<ListenerAdapter> listener =
81                     ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
82             if (listener.isPresent()) {
83                 listener.get().addSubscriber(ctx.channel());
84                 LOG.debug("Subscriber successfully registered.");
85             } else {
86                 LOG.error("Listener for stream with name '{}' was not found.", streamName);
87                 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
88                         HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
89             }
90         } else if (streamName.contains(RestconfConstants.NOTIFICATION_STREAM)) {
91             final Optional<NotificationListenerAdapter> listener =
92                     ListenersBroker.getInstance().getNotificationListenerFor(streamName);
93             if (listener.isPresent()) {
94                 listener.get().addSubscriber(ctx.channel());
95                 LOG.debug("Subscriber successfully registered.");
96             } else {
97                 LOG.error("Listener for stream with name '{}' was not found.", streamName);
98                 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
99                         HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
100             }
101         }
102
103         // Handshake
104         final WebSocketServerHandshakerFactory wsFactory =
105                 new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
106                         null, false);
107         this.handshaker = wsFactory.newHandshaker(req);
108         if (this.handshaker == null) {
109             WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
110         } else {
111             this.handshaker.handshake(ctx.channel(), req);
112         }
113
114     }
115
116     /**
117      * Checks response status, send response and close connection if necessary.
118      *
119      * @param ctx ChannelHandlerContext
120      * @param req HttpRequest
121      * @param res FullHttpResponse
122      */
123     private static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpRequest req,
124                                          final FullHttpResponse res) {
125         // Generate an error page if response getStatus code is not OK (200).
126         final boolean notOkay = !HttpResponseStatus.OK.equals(res.status());
127         if (notOkay) {
128             res.content().writeCharSequence(res.status().toString(), CharsetUtil.UTF_8);
129             HttpUtil.setContentLength(res, res.content().readableBytes());
130         }
131
132         // Send the response and close the connection if necessary.
133         final ChannelFuture f = ctx.channel().writeAndFlush(res);
134         if (notOkay || !HttpUtil.isKeepAlive(req)) {
135             f.addListener(ChannelFutureListener.CLOSE);
136         }
137     }
138
139     /**
140      * Handles web socket frame.
141      *
142      * @param ctx {@link ChannelHandlerContext}
143      * @param frame {@link WebSocketFrame}
144      */
145     private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) {
146         if (frame instanceof CloseWebSocketFrame) {
147             this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
148             final String streamName = ListenersBroker.createStreamNameFromUri(
149                     ((CloseWebSocketFrame) frame).reasonText());
150             if (streamName.contains(RestconfConstants.DATA_SUBSCR)) {
151                 final Optional<ListenerAdapter> listener = ListenersBroker.getInstance()
152                         .getDataChangeListenerFor(streamName);
153                 if (listener.isPresent()) {
154                     listener.get().removeSubscriber(ctx.channel());
155                     LOG.debug("Subscriber successfully removed.");
156                     if (!listener.get().hasSubscribers()) {
157                         ListenersBroker.getInstance().removeAndCloseDataChangeListener(listener.get());
158                     }
159                 }
160             } else if (streamName.contains(RestconfConstants.NOTIFICATION_STREAM)) {
161                 final Optional<NotificationListenerAdapter> listener
162                         = ListenersBroker.getInstance().getNotificationListenerFor(streamName);
163                 if (listener.isPresent()) {
164                     listener.get().removeSubscriber(ctx.channel());
165                     LOG.debug("Subscriber successfully removed.");
166                     if (!listener.get().hasSubscribers()) {
167                         ListenersBroker.getInstance().removeAndCloseNotificationListener(listener.get());
168                     }
169                 }
170             }
171         } else if (frame instanceof PingWebSocketFrame) {
172             ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
173         }
174     }
175
176     @Override
177     public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
178         ctx.close();
179     }
180
181     /**
182      * Get web socket location from HTTP request.
183      *
184      * @param req HTTP request from which the location will be returned.
185      * @return String representation of web socket location.
186      */
187     private static String getWebSocketLocation(final HttpRequest req) {
188         return "ws://" + req.headers().get(HttpHeaderNames.HOST) + req.uri();
189     }
190 }