2 * Copyright © 2019 FRINX s.r.o. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams;
10 import com.google.common.annotations.VisibleForTesting;
11 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
12 import java.util.concurrent.ScheduledExecutorService;
13 import javax.inject.Inject;
14 import javax.inject.Singleton;
15 import javax.servlet.http.HttpServletResponse;
16 import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
17 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
18 import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
19 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
20 import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
21 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
22 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
27 * Web-socket servlet listening on ws or wss schemas for created data-change-event or notification streams.
30 public final class WebSocketInitializer extends WebSocketServlet {
31 private static final long serialVersionUID = 1L;
33 @SuppressFBWarnings(value = "SE_BAD_FIELD",
34 justification = "Servlet/WebSocket bridge, we need this service for heartbeats")
35 private final ScheduledExecutorService executorService;
36 private final int maximumFragmentLength;
37 private final int heartbeatInterval;
38 private final int idleTimeoutMillis;
41 * Creation of the web-socket initializer.
43 * @param scheduledThreadPool ODL thread pool used for fetching of scheduled executors.
44 * @param configuration Web-socket configuration holder.
47 public WebSocketInitializer(final ScheduledThreadPool scheduledThreadPool, final Configuration configuration) {
48 executorService = scheduledThreadPool.getExecutor();
49 maximumFragmentLength = configuration.getMaximumFragmentLength();
50 heartbeatInterval = configuration.getHeartbeatInterval();
51 idleTimeoutMillis = configuration.getIdleTimeout();
55 * Configuration of the web-socket factory - idle timeout and specified factory object.
57 * @param factory Configurable web-socket factory.
60 public void configure(final WebSocketServletFactory factory) {
61 factory.getPolicy().setIdleTimeout(idleTimeoutMillis);
62 factory.setCreator(new WebSocketFactory(executorService, maximumFragmentLength, heartbeatInterval));
66 * Factory that is used for creation of new web-sockets based on HTTP/HTTPS upgrade request.
69 static final class WebSocketFactory implements WebSocketCreator {
70 private static final Logger LOG = LoggerFactory.getLogger(WebSocketFactory.class);
72 private final ScheduledExecutorService executorService;
73 // FIXME: inject this reference
74 private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
75 private final int maximumFragmentLength;
76 private final int heartbeatInterval;
79 * Creation of the web-socket factory.
81 * @param executorService Executor for creation of threads for controlling of web-socket sessions.
82 * @param maximumFragmentLength Maximum web-socket fragment length in number of Unicode code units (characters)
83 * (exceeded message length leads to fragmentation of messages).
84 * @param heartbeatInterval Interval in milliseconds between sending of ping control frames.
86 WebSocketFactory(final ScheduledExecutorService executorService, final int maximumFragmentLength,
87 final int heartbeatInterval) {
88 this.executorService = executorService;
89 this.maximumFragmentLength = maximumFragmentLength;
90 this.heartbeatInterval = heartbeatInterval;
94 * Creation of the new web-socket based on input HTTP/HTTPS upgrade request. Web-socket is created only if the
95 * data listener for input URI can be found (results in status code 101); otherwise status code 404 is set
96 * in upgrade response.
98 * @param servletUpgradeRequest Upgrade request.
99 * @param servletUpgradeResponse Upgrade response.
100 * @return Created web-socket instance or {@code null} if the web-socket cannot be created.
103 public Object createWebSocket(final ServletUpgradeRequest servletUpgradeRequest,
104 final ServletUpgradeResponse servletUpgradeResponse) {
105 final var streamName = ListenersBroker.createStreamNameFromUri(
106 servletUpgradeRequest.getRequestURI().getRawPath());
108 final var listener = listenersBroker.listenerFor(streamName);
109 if (listener == null) {
110 LOG.debug("Listener for stream with name {} was not found.", streamName);
111 servletUpgradeResponse.setSuccess(false);
112 servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_NOT_FOUND);
116 LOG.debug("Listener for stream with name {} has been found, web-socket session handler will be created",
118 servletUpgradeResponse.setSuccess(true);
119 servletUpgradeResponse.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
120 // note: every web-socket manages PING process individually because this approach scales better than
121 // sending of PING frames at once over all web-socket sessions
122 return new WebSocketSessionHandler(executorService, listener, maximumFragmentLength, heartbeatInterval);