import javax.inject.Inject;
import javax.inject.Singleton;
import javax.ws.rs.core.UriInfo;
-import org.glassfish.jersey.media.sse.EventOutput;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
-import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfDataStreamService;
+import org.opendaylight.restconf.nb.rfc8040.streams.Configuration;
+import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
-import org.opendaylight.restconf.nb.rfc8040.streams.sse.SSEInitializer;
-import org.opendaylight.restconf.nb.rfc8040.streams.sse.SSESessionHandler;
+import org.opendaylight.yangtools.yang.common.ErrorTag;
+import org.opendaylight.yangtools.yang.common.ErrorType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final int heartbeatInterval;
@Inject
- public RestconfDataStreamServiceImpl(final SSEInitializer configuration) {
- executorService = configuration.getExecutorService();
+ public RestconfDataStreamServiceImpl(final ScheduledThreadPool scheduledThreadPool,
+ final Configuration configuration) {
+ executorService = scheduledThreadPool.getExecutor();
heartbeatInterval = configuration.getHeartbeatInterval();
maximumFragmentLength = configuration.getMaximumFragmentLength();
}
@Override
- public EventOutput getSSE(final String identifier, final UriInfo uriInfo) {
+ public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) {
final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
final BaseListenerInterface listener = listenersBroker.getListenerFor(streamName)
.orElseThrow(() -> {
});
LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName);
- final EventOutput eventOutput = new EventOutput();
- final SSESessionHandler handler = new SSESessionHandler(executorService, eventOutput, listener,
+
+ // FIXME: invert control here: we should call 'listener.addSession()', which in turn should call
+ // handler.init()/handler.close()
+ final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener,
maximumFragmentLength, heartbeatInterval);
handler.init();
- return eventOutput;
}
}