import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfDataStreamService;
+import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.streams.Configuration;
import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
@Override
public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) {
final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
- final BaseListenerInterface listener = listenersBroker.getListenerFor(streamName)
- .orElseThrow(() -> {
- LOG.debug("Listener for stream with name {} was not found.", streamName);
- throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
- });
+ final BaseListenerInterface listener;
+ final String notificaionType =
+ uriInfo.getQueryParameters().getFirst(RestconfStreamsConstants.NOTIFICATION_TYPE);
+ if (notificaionType != null && notificaionType.equals(RestconfStreamsConstants.DEVICE)) {
+ listener = listenersBroker.getDeviceNotificationListenerFor(streamName)
+ .orElseThrow(() -> {
+ LOG.debug("Listener for device path with name {} was not found.", streamName);
+ throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION,
+ ErrorTag.DATA_MISSING);
+ });
+ } else {
+ listener = listenersBroker.getListenerFor(streamName)
+ .orElseThrow(() -> {
+ LOG.debug("Listener for stream with name {} was not found.", streamName);
+ throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
+ });
+ }
LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName);
-
// FIXME: invert control here: we should call 'listener.addSession()', which in turn should call
// handler.init()/handler.close()
final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener,