Unify streams URI
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / JaxRsNorthbound.java
index 2042e29d061d8e6d5a623f405c6d6df056bf6139..058ac8449ed7dac32b14f17d4f1dc6aaa299d5cc 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.restconf.nb.rfc8040;
 
 import com.google.common.annotations.Beta;
 import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
 import org.opendaylight.aaa.filterchain.configuration.CustomFilterAdapterConfiguration;
 import org.opendaylight.aaa.filterchain.filters.CustomFilterAdapter;
 import org.opendaylight.aaa.web.FilterDetails;
@@ -27,12 +28,9 @@ import org.opendaylight.mdsal.dom.api.DOMRpcService;
 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.MdsalRestconfServer;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.SubscribeToStreamUtil;
-import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
 import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yangtools.concepts.Registration;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -78,11 +76,10 @@ public final class JaxRsNorthbound implements AutoCloseable {
             @Reference final DOMMountPointService mountPointService,
             @Reference final DOMNotificationService notificationService, @Reference final DOMRpcService rpcService,
             @Reference final DOMSchemaService schemaService, @Reference final DatabindProvider databindProvider,
-            @Reference final MdsalRestconfServer server, @Reference final ListenersBroker listenersBroker,
-            final Configuration configuration) throws ServletException {
+            @Reference final MdsalRestconfServer server, final Configuration configuration) throws ServletException {
         this(webServer, webContextSecurer, servletSupport, filterAdapterConfiguration, actionService, dataBroker,
             mountPointService, notificationService, rpcService, schemaService, databindProvider, server,
-            listenersBroker, configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
+            configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
             new StreamsConfiguration(configuration.maximum$_$fragment$_$length(),
                 configuration.idle$_$timeout(), configuration.heartbeat$_$interval(), configuration.use$_$sse()));
     }
@@ -92,33 +89,21 @@ public final class JaxRsNorthbound implements AutoCloseable {
             final DOMActionService actionService, final DOMDataBroker dataBroker,
             final DOMMountPointService mountPointService, final DOMNotificationService notificationService,
             final DOMRpcService rpcService, final DOMSchemaService schemaService,
-            final DatabindProvider databindProvider, final MdsalRestconfServer server,
-            final ListenersBroker listenersBroker, final String pingNamePrefix, final int pingMaxThreadCount,
-            final StreamsConfiguration streamsConfiguration) throws ServletException {
+            final DatabindProvider databindProvider, final MdsalRestconfServer server, final String pingNamePrefix,
+            final int pingMaxThreadCount, final StreamsConfiguration streamsConfiguration) throws ServletException {
         final var scheduledThreadPool = new ScheduledThreadPoolWrapper(pingMaxThreadCount,
             new NamingThreadPoolFactory(pingNamePrefix));
 
-        final SubscribeToStreamUtil streamUtils;
-        final ServletDetails streamServlet;
+        final ListenersBroker listenersBroker;
+        final HttpServlet streamServlet;
         if (streamsConfiguration.useSSE()) {
-            streamUtils = SubscribeToStreamUtil.serverSentEvents(listenersBroker);
-            streamServlet = ServletDetails.builder()
-                .addUrlPattern("/" + URLConstants.SSE_SUBPATH + "/*")
-                .servlet(servletSupport.createHttpServletBuilder(
-                    new DataStreamApplication(databindProvider,
-                        new RestconfDataStreamServiceImpl(scheduledThreadPool, listenersBroker, streamsConfiguration)))
-                    .build())
-                .name("notificationServlet")
-                .asyncSupported(true)
+            listenersBroker = new ListenersBroker.ServerSentEvents();
+            streamServlet = servletSupport.createHttpServletBuilder(
+                new ServerSentEventsApplication(scheduledThreadPool, listenersBroker, streamsConfiguration))
                 .build();
         } else {
-            streamUtils = SubscribeToStreamUtil.webSockets(listenersBroker);
-            streamServlet = ServletDetails.builder()
-                .addUrlPattern("/" + RestconfStreamsConstants.DATA_SUBSCRIPTION + "/*")
-                .addUrlPattern("/" + RestconfStreamsConstants.NOTIFICATION_STREAM + "/*")
-                .addUrlPattern("/" + RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM + "/*")
-                .servlet(new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration))
-                .build();
+            listenersBroker = new ListenersBroker.WebSockets();
+            streamServlet = new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration);
         }
 
         final var restconfBuilder = WebContext.builder()
@@ -129,11 +114,16 @@ public final class JaxRsNorthbound implements AutoCloseable {
                 .addUrlPattern("/*")
                 .servlet(servletSupport.createHttpServletBuilder(
                     new RestconfApplication(databindProvider, server, mountPointService, dataBroker, actionService,
-                        notificationService, schemaService, streamUtils))
+                        notificationService, schemaService, listenersBroker))
                     .build())
                 .asyncSupported(true)
                 .build())
-            .addServlet(streamServlet)
+            .addServlet(ServletDetails.builder()
+                .addUrlPattern("/" + URLConstants.STREAMS_SUBPATH + "/*")
+                .servlet(streamServlet)
+                .name("notificationServlet")
+                .asyncSupported(true)
+                .build())
 
             // Allows user to add javax.servlet.Filter(s) in front of REST services
             .addFilter(FilterDetails.builder()