import com.google.common.annotations.Beta;
import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
import org.opendaylight.aaa.filterchain.configuration.CustomFilterAdapterConfiguration;
import org.opendaylight.aaa.filterchain.filters.CustomFilterAdapter;
import org.opendaylight.aaa.web.FilterDetails;
import org.opendaylight.mdsal.dom.api.DOMSchemaService;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.MdsalRestconfServer;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.SubscribeToStreamUtil;
-import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yangtools.concepts.Registration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@Reference final DOMMountPointService mountPointService,
@Reference final DOMNotificationService notificationService, @Reference final DOMRpcService rpcService,
@Reference final DOMSchemaService schemaService, @Reference final DatabindProvider databindProvider,
- @Reference final MdsalRestconfServer server, @Reference final ListenersBroker listenersBroker,
- final Configuration configuration) throws ServletException {
+ @Reference final MdsalRestconfServer server, final Configuration configuration) throws ServletException {
this(webServer, webContextSecurer, servletSupport, filterAdapterConfiguration, actionService, dataBroker,
mountPointService, notificationService, rpcService, schemaService, databindProvider, server,
- listenersBroker, configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
+ configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
new StreamsConfiguration(configuration.maximum$_$fragment$_$length(),
configuration.idle$_$timeout(), configuration.heartbeat$_$interval(), configuration.use$_$sse()));
}
final DOMActionService actionService, final DOMDataBroker dataBroker,
final DOMMountPointService mountPointService, final DOMNotificationService notificationService,
final DOMRpcService rpcService, final DOMSchemaService schemaService,
- final DatabindProvider databindProvider, final MdsalRestconfServer server,
- final ListenersBroker listenersBroker, final String pingNamePrefix, final int pingMaxThreadCount,
- final StreamsConfiguration streamsConfiguration) throws ServletException {
+ final DatabindProvider databindProvider, final MdsalRestconfServer server, final String pingNamePrefix,
+ final int pingMaxThreadCount, final StreamsConfiguration streamsConfiguration) throws ServletException {
final var scheduledThreadPool = new ScheduledThreadPoolWrapper(pingMaxThreadCount,
new NamingThreadPoolFactory(pingNamePrefix));
- final SubscribeToStreamUtil streamUtils;
- final ServletDetails streamServlet;
+ final ListenersBroker listenersBroker;
+ final HttpServlet streamServlet;
if (streamsConfiguration.useSSE()) {
- streamUtils = SubscribeToStreamUtil.serverSentEvents(listenersBroker);
- streamServlet = ServletDetails.builder()
- .addUrlPattern("/" + URLConstants.SSE_SUBPATH + "/*")
- .servlet(servletSupport.createHttpServletBuilder(
- new DataStreamApplication(databindProvider,
- new RestconfDataStreamServiceImpl(scheduledThreadPool, listenersBroker, streamsConfiguration)))
- .build())
- .name("notificationServlet")
- .asyncSupported(true)
+ listenersBroker = new ListenersBroker.ServerSentEvents();
+ streamServlet = servletSupport.createHttpServletBuilder(
+ new ServerSentEventsApplication(scheduledThreadPool, listenersBroker, streamsConfiguration))
.build();
} else {
- streamUtils = SubscribeToStreamUtil.webSockets(listenersBroker);
- streamServlet = ServletDetails.builder()
- .addUrlPattern("/" + RestconfStreamsConstants.DATA_SUBSCRIPTION + "/*")
- .addUrlPattern("/" + RestconfStreamsConstants.NOTIFICATION_STREAM + "/*")
- .addUrlPattern("/" + RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM + "/*")
- .servlet(new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration))
- .build();
+ listenersBroker = new ListenersBroker.WebSockets();
+ streamServlet = new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration);
}
final var restconfBuilder = WebContext.builder()
.addUrlPattern("/*")
.servlet(servletSupport.createHttpServletBuilder(
new RestconfApplication(databindProvider, server, mountPointService, dataBroker, actionService,
- notificationService, schemaService, streamUtils))
+ notificationService, schemaService, listenersBroker))
.build())
.asyncSupported(true)
.build())
- .addServlet(streamServlet)
+ .addServlet(ServletDetails.builder()
+ .addUrlPattern("/" + URLConstants.STREAMS_SUBPATH + "/*")
+ .servlet(streamServlet)
+ .name("notificationServlet")
+ .asyncSupported(true)
+ .build())
// Allows user to add javax.servlet.Filter(s) in front of REST services
.addFilter(FilterDetails.builder()