Netconf Device Notification
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / RestconfDataStreamServiceImpl.java
index 0862e2eab2d87ce3c1d999c8b01dcf59b0717470..8307f9138e2d504c9380e6df385caca9e8d1397b 100644 (file)
@@ -16,6 +16,7 @@ import javax.ws.rs.sse.SseEventSink;
 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfDataStreamService;
+import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.streams.Configuration;
 import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
@@ -48,14 +49,25 @@ public class RestconfDataStreamServiceImpl implements RestconfDataStreamService
     @Override
     public void getSSE(final String identifier, final UriInfo uriInfo, final SseEventSink sink, final Sse sse) {
         final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
-        final BaseListenerInterface listener = listenersBroker.getListenerFor(streamName)
-            .orElseThrow(() -> {
-                LOG.debug("Listener for stream with name {} was not found.", streamName);
-                throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
-            });
+        final BaseListenerInterface listener;
+        final String notificaionType =
+            uriInfo.getQueryParameters().getFirst(RestconfStreamsConstants.NOTIFICATION_TYPE);
+        if (notificaionType != null && notificaionType.equals(RestconfStreamsConstants.DEVICE)) {
+            listener = listenersBroker.getDeviceNotificationListenerFor(streamName)
+                .orElseThrow(() -> {
+                    LOG.debug("Listener for device path with name {} was not found.", streamName);
+                    throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION,
+                        ErrorTag.DATA_MISSING);
+                });
+        } else {
+            listener = listenersBroker.getListenerFor(streamName)
+                .orElseThrow(() -> {
+                    LOG.debug("Listener for stream with name {} was not found.", streamName);
+                    throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
+                });
+        }
 
         LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName);
-
         // FIXME: invert control here: we should call 'listener.addSession()', which in turn should call
         //        handler.init()/handler.close()
         final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener,