Unify streams URI
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / JaxRsNorthbound.java
index 6f5e2fead8fc53d21685eede4295661e86253fdf..058ac8449ed7dac32b14f17d4f1dc6aaa299d5cc 100644 (file)
@@ -7,13 +7,9 @@
  */
 package org.opendaylight.restconf.nb.rfc8040;
 
-import static org.opendaylight.restconf.nb.rfc8040.URLConstants.BASE_PATH;
-import static org.opendaylight.restconf.nb.rfc8040.URLConstants.SSE_SUBPATH;
-import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.DATA_SUBSCRIPTION;
-import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.NOTIFICATION_STREAM;
-
 import com.google.common.annotations.Beta;
 import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
 import org.opendaylight.aaa.filterchain.configuration.CustomFilterAdapterConfiguration;
 import org.opendaylight.aaa.filterchain.filters.CustomFilterAdapter;
 import org.opendaylight.aaa.web.FilterDetails;
@@ -31,7 +27,8 @@ import org.opendaylight.mdsal.dom.api.DOMNotificationService;
 import org.opendaylight.mdsal.dom.api.DOMRpcService;
 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl;
+import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.MdsalRestconfServer;
+import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
 import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer;
 import org.opendaylight.yangtools.concepts.Registration;
@@ -79,12 +76,12 @@ public final class JaxRsNorthbound implements AutoCloseable {
             @Reference final DOMMountPointService mountPointService,
             @Reference final DOMNotificationService notificationService, @Reference final DOMRpcService rpcService,
             @Reference final DOMSchemaService schemaService, @Reference final DatabindProvider databindProvider,
-            final Configuration configuration) throws ServletException {
+            @Reference final MdsalRestconfServer server, final Configuration configuration) throws ServletException {
         this(webServer, webContextSecurer, servletSupport, filterAdapterConfiguration, actionService, dataBroker,
-            mountPointService, notificationService, rpcService, schemaService, databindProvider,
+            mountPointService, notificationService, rpcService, schemaService, databindProvider, server,
             configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
-            new StreamsConfiguration(configuration.maximum$_$fragment$_$length(), configuration.idle$_$timeout(),
-                configuration.heartbeat$_$interval(), configuration.use$_$sse()));
+            new StreamsConfiguration(configuration.maximum$_$fragment$_$length(),
+                configuration.idle$_$timeout(), configuration.heartbeat$_$interval(), configuration.use$_$sse()));
     }
 
     public JaxRsNorthbound(final WebServer webServer, final WebContextSecurer webContextSecurer,
@@ -92,36 +89,41 @@ public final class JaxRsNorthbound implements AutoCloseable {
             final DOMActionService actionService, final DOMDataBroker dataBroker,
             final DOMMountPointService mountPointService, final DOMNotificationService notificationService,
             final DOMRpcService rpcService, final DOMSchemaService schemaService,
-            final DatabindProvider databindProvider,
-            final String pingNamePrefix, final int pingMaxThreadCount,
-            final StreamsConfiguration streamsConfiguration) throws ServletException {
+            final DatabindProvider databindProvider, final MdsalRestconfServer server, final String pingNamePrefix,
+            final int pingMaxThreadCount, final StreamsConfiguration streamsConfiguration) throws ServletException {
         final var scheduledThreadPool = new ScheduledThreadPoolWrapper(pingMaxThreadCount,
             new NamingThreadPoolFactory(pingNamePrefix));
 
+        final ListenersBroker listenersBroker;
+        final HttpServlet streamServlet;
+        if (streamsConfiguration.useSSE()) {
+            listenersBroker = new ListenersBroker.ServerSentEvents();
+            streamServlet = servletSupport.createHttpServletBuilder(
+                new ServerSentEventsApplication(scheduledThreadPool, listenersBroker, streamsConfiguration))
+                .build();
+        } else {
+            listenersBroker = new ListenersBroker.WebSockets();
+            streamServlet = new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration);
+        }
+
         final var restconfBuilder = WebContext.builder()
             .name("RFC8040 RESTCONF")
-            .contextPath("/" + BASE_PATH)
+            .contextPath("/" + URLConstants.BASE_PATH)
             .supportsSessions(false)
             .addServlet(ServletDetails.builder()
                 .addUrlPattern("/*")
                 .servlet(servletSupport.createHttpServletBuilder(
-                    new RestconfApplication(databindProvider, mountPointService, dataBroker, rpcService, actionService,
-                        notificationService, schemaService, streamsConfiguration)).build())
+                    new RestconfApplication(databindProvider, server, mountPointService, dataBroker, actionService,
+                        notificationService, schemaService, listenersBroker))
+                    .build())
                 .asyncSupported(true)
                 .build())
             .addServlet(ServletDetails.builder()
-                .addUrlPattern("/" + SSE_SUBPATH + "/*")
-                .servlet(servletSupport.createHttpServletBuilder(
-                    new DataStreamApplication(databindProvider, mountPointService,
-                        new RestconfDataStreamServiceImpl(scheduledThreadPool, streamsConfiguration))).build())
+                .addUrlPattern("/" + URLConstants.STREAMS_SUBPATH + "/*")
+                .servlet(streamServlet)
                 .name("notificationServlet")
                 .asyncSupported(true)
                 .build())
-            .addServlet(ServletDetails.builder()
-                .addUrlPattern("/" + DATA_SUBSCRIPTION + "/*")
-                .addUrlPattern("/" + NOTIFICATION_STREAM + "/*")
-                .servlet(new WebSocketInitializer(scheduledThreadPool, streamsConfiguration))
-                .build())
 
             // Allows user to add javax.servlet.Filter(s) in front of REST services
             .addFilter(FilterDetails.builder()
@@ -140,7 +142,8 @@ public final class JaxRsNorthbound implements AutoCloseable {
             .supportsSessions(false)
             .addServlet(ServletDetails.builder()
                 .addUrlPattern("/*")
-                .servlet(servletSupport.createHttpServletBuilder(new RootFoundApplication(BASE_PATH)).build())
+                .servlet(servletSupport.createHttpServletBuilder(new RootFoundApplication(URLConstants.BASE_PATH))
+                    .build())
                 .name("Rootfound")
                 .build());