* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
+package org.opendaylight.restconf.nb.rfc8040.streams;
import static java.util.Objects.requireNonNull;
import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
-import java.util.concurrent.ScheduledExecutorService;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEventSink;
import javax.xml.xpath.XPathExpressionException;
-import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams;
-import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker;
-import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName;
-import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler;
-import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.server.spi.RestconfStream;
+import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Access to notification streams via Server-Sent Events.
*/
@Path("/")
-public final class RestconfDataStreamServiceImpl {
- private static final Logger LOG = LoggerFactory.getLogger(RestconfDataStreamServiceImpl.class);
+final class SSEStreamService {
+ private static final Logger LOG = LoggerFactory.getLogger(SSEStreamService.class);
- private final ListenersBroker listenersBroker;
- private final ScheduledExecutorService executorService;
+ private final RestconfStream.Registry streamRegistry;
+ private final PingExecutor pingExecutor;
private final int maximumFragmentLength;
private final int heartbeatInterval;
- public RestconfDataStreamServiceImpl(final ScheduledThreadPool scheduledThreadPool,
- final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
- executorService = scheduledThreadPool.getExecutor();
- this.listenersBroker = requireNonNull(listenersBroker);
+ SSEStreamService(final RestconfStream.Registry streamRegistry, final PingExecutor pingExecutor,
+ final StreamsConfiguration configuration) {
+ this.streamRegistry = requireNonNull(streamRegistry);
+ this.pingExecutor = requireNonNull(pingExecutor);
heartbeatInterval = configuration.heartbeatInterval();
maximumFragmentLength = configuration.maximumFragmentLength();
}
public void getSSE(@PathParam("encodingName") final EncodingName encodingName,
@PathParam("streamName") final String streamName, @Context final UriInfo uriInfo,
@Context final SseEventSink sink, @Context final Sse sse) {
- final var stream = listenersBroker.getStream(streamName);
+ final var stream = streamRegistry.lookupStream(streamName);
if (stream == null) {
LOG.debug("Listener for stream with name {} was not found.", streamName);
throw new NotFoundException("No such stream: " + streamName);
LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName);
// FIXME: invert control here: we should call 'listener.addSession()', which in turn should call
// handler.init()/handler.close()
- final var handler = new SSESessionHandler(executorService, sink, sse, stream, encodingName, params,
+ final var handler = new SSESender(pingExecutor, sink, sse, stream, encodingName, params,
maximumFragmentLength, heartbeatInterval);
try {