2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.restconf.nb.rfc8040.streams.websockets;
11 import io.netty.channel.ChannelFuture;
12 import io.netty.channel.ChannelFutureListener;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.SimpleChannelInboundHandler;
15 import io.netty.handler.codec.http.DefaultFullHttpResponse;
16 import io.netty.handler.codec.http.FullHttpRequest;
17 import io.netty.handler.codec.http.FullHttpResponse;
18 import io.netty.handler.codec.http.HttpHeaderNames;
19 import io.netty.handler.codec.http.HttpMethod;
20 import io.netty.handler.codec.http.HttpRequest;
21 import io.netty.handler.codec.http.HttpResponseStatus;
22 import io.netty.handler.codec.http.HttpUtil;
23 import io.netty.handler.codec.http.HttpVersion;
24 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
25 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
26 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
27 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
28 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
29 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
30 import io.netty.util.CharsetUtil;
31 import java.util.Optional;
32 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
33 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
34 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
35 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
40 * {@link WebSocketServerHandler} is implementation of {@link SimpleChannelInboundHandler} which allow handle
41 * {@link FullHttpRequest} and {@link WebSocketFrame} messages.
43 public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
44 private static final Logger LOG = LoggerFactory.getLogger(WebSocketServerHandler.class);
46 private WebSocketServerHandshaker handshaker;
49 protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) {
50 if (msg instanceof FullHttpRequest) {
51 handleHttpRequest(ctx, (FullHttpRequest) msg);
52 } else if (msg instanceof WebSocketFrame) {
53 handleWebSocketFrame(ctx, (WebSocketFrame) msg);
58 * Checks if HTTP request method is GET and if is possible to decode HTTP result of request.
60 * @param ctx ChannelHandlerContext
61 * @param req FullHttpRequest
63 private void handleHttpRequest(final ChannelHandlerContext ctx, final FullHttpRequest req) {
64 // Handle a bad request.
65 if (!req.decoderResult().isSuccess()) {
66 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
67 HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
71 // Allow only GET methods.
72 if (req.method() != HttpMethod.GET) {
73 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
74 HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN));
78 final String streamName = ListenersBroker.createStreamNameFromUri(req.uri());
79 if (streamName.contains(RestconfConstants.DATA_SUBSCR)) {
80 final Optional<ListenerAdapter> listener =
81 ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
82 if (listener.isPresent()) {
83 listener.get().addSubscriber(ctx.channel());
84 LOG.debug("Subscriber successfully registered.");
86 LOG.error("Listener for stream with name '{}' was not found.", streamName);
87 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
88 HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
90 } else if (streamName.contains(RestconfConstants.NOTIFICATION_STREAM)) {
91 final Optional<NotificationListenerAdapter> listener =
92 ListenersBroker.getInstance().getNotificationListenerFor(streamName);
93 if (listener.isPresent()) {
94 listener.get().addSubscriber(ctx.channel());
95 LOG.debug("Subscriber successfully registered.");
97 LOG.error("Listener for stream with name '{}' was not found.", streamName);
98 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
99 HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
104 final WebSocketServerHandshakerFactory wsFactory =
105 new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
107 this.handshaker = wsFactory.newHandshaker(req);
108 if (this.handshaker == null) {
109 WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
111 this.handshaker.handshake(ctx.channel(), req);
117 * Checks response status, send response and close connection if necessary.
119 * @param ctx ChannelHandlerContext
120 * @param req HttpRequest
121 * @param res FullHttpResponse
123 private static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpRequest req,
124 final FullHttpResponse res) {
125 // Generate an error page if response getStatus code is not OK (200).
126 final boolean notOkay = !HttpResponseStatus.OK.equals(res.status());
128 res.content().writeCharSequence(res.status().toString(), CharsetUtil.UTF_8);
129 HttpUtil.setContentLength(res, res.content().readableBytes());
132 // Send the response and close the connection if necessary.
133 final ChannelFuture f = ctx.channel().writeAndFlush(res);
134 if (notOkay || !HttpUtil.isKeepAlive(req)) {
135 f.addListener(ChannelFutureListener.CLOSE);
140 * Handles web socket frame.
142 * @param ctx {@link ChannelHandlerContext}
143 * @param frame {@link WebSocketFrame}
145 private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) {
146 if (frame instanceof CloseWebSocketFrame) {
147 this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
148 final String streamName = ListenersBroker.createStreamNameFromUri(
149 ((CloseWebSocketFrame) frame).reasonText());
150 if (streamName.contains(RestconfConstants.DATA_SUBSCR)) {
151 final Optional<ListenerAdapter> listener = ListenersBroker.getInstance()
152 .getDataChangeListenerFor(streamName);
153 if (listener.isPresent()) {
154 listener.get().removeSubscriber(ctx.channel());
155 LOG.debug("Subscriber successfully removed.");
156 if (!listener.get().hasSubscribers()) {
157 ListenersBroker.getInstance().removeAndCloseDataChangeListener(listener.get());
160 } else if (streamName.contains(RestconfConstants.NOTIFICATION_STREAM)) {
161 final Optional<NotificationListenerAdapter> listener
162 = ListenersBroker.getInstance().getNotificationListenerFor(streamName);
163 if (listener.isPresent()) {
164 listener.get().removeSubscriber(ctx.channel());
165 LOG.debug("Subscriber successfully removed.");
166 if (!listener.get().hasSubscribers()) {
167 ListenersBroker.getInstance().removeAndCloseNotificationListener(listener.get());
171 } else if (frame instanceof PingWebSocketFrame) {
172 ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
177 public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
182 * Get web socket location from HTTP request.
184 * @param req HTTP request from which the location will be returned.
185 * @return String representation of web socket location.
187 private static String getWebSocketLocation(final HttpRequest req) {
188 return "ws://" + req.headers().get(HttpHeaderNames.HOST) + req.uri();