Fix device notification not working with websockets 57/108757/1
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 30 Oct 2023 21:09:30 +0000 (22:09 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 30 Oct 2023 21:11:50 +0000 (22:11 +0100)
The routing of device notifications works only in through a hack in
RestconfDataStreamServiceImpl.

This patch refactors the implementation to properly assign a well-known
prefix, just as all the other adapters use, moving routing
appropriately.

JIRA: NETCONF-1102
Change-Id: I333a6d641d41a2e849b6004f8ec81020a9572283
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/JaxRsNorthbound.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/CreateStreamUtil.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/RestconfStreamsConstants.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java

index d3e2676a7951e7b115a9d50241669852e9757a6f..3a0d25815ba2762425220637fe9535841044b461 100644 (file)
@@ -7,11 +7,6 @@
  */
 package org.opendaylight.restconf.nb.rfc8040;
 
-import static org.opendaylight.restconf.nb.rfc8040.URLConstants.BASE_PATH;
-import static org.opendaylight.restconf.nb.rfc8040.URLConstants.SSE_SUBPATH;
-import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.DATA_SUBSCRIPTION;
-import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.NOTIFICATION_STREAM;
-
 import com.google.common.annotations.Beta;
 import javax.servlet.ServletException;
 import org.opendaylight.aaa.filterchain.configuration.CustomFilterAdapterConfiguration;
@@ -33,6 +28,7 @@ import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.MdsalRestconfServer;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl;
+import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
 import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
@@ -103,7 +99,7 @@ public final class JaxRsNorthbound implements AutoCloseable {
 
         final var restconfBuilder = WebContext.builder()
             .name("RFC8040 RESTCONF")
-            .contextPath("/" + BASE_PATH)
+            .contextPath("/" + URLConstants.BASE_PATH)
             .supportsSessions(false)
             .addServlet(ServletDetails.builder()
                 .addUrlPattern("/*")
@@ -114,7 +110,7 @@ public final class JaxRsNorthbound implements AutoCloseable {
                 .asyncSupported(true)
                 .build())
             .addServlet(ServletDetails.builder()
-                .addUrlPattern("/" + SSE_SUBPATH + "/*")
+                .addUrlPattern("/" + URLConstants.SSE_SUBPATH + "/*")
                 .servlet(servletSupport.createHttpServletBuilder(
                     new DataStreamApplication(databindProvider,
                         new RestconfDataStreamServiceImpl(scheduledThreadPool, listenersBroker, streamsConfiguration)))
@@ -123,8 +119,9 @@ public final class JaxRsNorthbound implements AutoCloseable {
                 .asyncSupported(true)
                 .build())
             .addServlet(ServletDetails.builder()
-                .addUrlPattern("/" + DATA_SUBSCRIPTION + "/*")
-                .addUrlPattern("/" + NOTIFICATION_STREAM + "/*")
+                .addUrlPattern("/" + RestconfStreamsConstants.DATA_SUBSCRIPTION + "/*")
+                .addUrlPattern("/" + RestconfStreamsConstants.NOTIFICATION_STREAM + "/*")
+                .addUrlPattern("/" + RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM + "/*")
                 .servlet(new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration))
                 .build())
 
@@ -145,7 +142,8 @@ public final class JaxRsNorthbound implements AutoCloseable {
             .supportsSessions(false)
             .addServlet(ServletDetails.builder()
                 .addUrlPattern("/*")
-                .servlet(servletSupport.createHttpServletBuilder(new RootFoundApplication(BASE_PATH)).build())
+                .servlet(servletSupport.createHttpServletBuilder(new RootFoundApplication(URLConstants.BASE_PATH))
+                    .build())
                 .name("Rootfound")
                 .build());
 
index c7a25d1a6de3f479de2d3a633917d068c8c75649..5c5fe98a7873e7eac4b72e6752d2f8841ceb313e 100644 (file)
@@ -208,8 +208,7 @@ final class CreateStreamUtil {
         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
             .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
             .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
-                baseUrl + notificationListenerAdapter.getStreamName() + "?"
-                    + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
+                baseUrl + notificationListenerAdapter.getStreamName()))
             .build()));
     }
 
index 46ba9e78e5268036db4629a40f57166308a97c51..cac39e1f275b65b177431d9bd63ba1fe4f25671d 100644 (file)
@@ -19,15 +19,12 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.UriInfo;
 import javax.ws.rs.sse.Sse;
 import javax.ws.rs.sse.SseEventSink;
 import org.opendaylight.controller.config.threadpool.ScheduledThreadPool;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
-import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
@@ -60,36 +57,24 @@ public final class RestconfDataStreamServiceImpl {
      * Get target data resource.
      *
      * @param identifier path to target
-     * @param uriInfo URI info
      */
     @GET
     @Path("/{identifier:.+}")
     @Produces(MediaType.SERVER_SENT_EVENTS)
-    public void getSSE(@Encoded @PathParam("identifier") final String identifier, @Context final UriInfo uriInfo,
-            @Context final SseEventSink sink, @Context final Sse sse) {
-        final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
-        final BaseListenerInterface listener;
-        final String notificaionType =
-            uriInfo.getQueryParameters().getFirst(RestconfStreamsConstants.NOTIFICATION_TYPE);
-        if (notificaionType != null && notificaionType.equals(RestconfStreamsConstants.DEVICE)) {
-            listener = listenersBroker.deviceNotificationListenerFor(streamName);
-            if (listener == null) {
-                LOG.debug("Listener for device path with name {} was not found.", streamName);
-                throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
-            }
-        } else {
-            listener = listenersBroker.listenerFor(streamName);
-            if (listener == null) {
-                LOG.debug("Listener for stream with name {} was not found.", streamName);
-                throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
-            }
+    public void getSSE(@Encoded @PathParam("identifier") final String identifier, @Context final SseEventSink sink,
+            @Context final Sse sse) {
+        final var streamName = ListenersBroker.createStreamNameFromUri(identifier);
+        final var listener = listenersBroker.listenerFor(streamName);
+        if (listener == null) {
+            LOG.debug("Listener for stream with name {} was not found.", streamName);
+            throw new RestconfDocumentedException("Data missing", ErrorType.APPLICATION, ErrorTag.DATA_MISSING);
         }
 
         LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName);
         // FIXME: invert control here: we should call 'listener.addSession()', which in turn should call
         //        handler.init()/handler.close()
-        final SSESessionHandler handler = new SSESessionHandler(executorService, sink, sse, listener,
-            maximumFragmentLength, heartbeatInterval);
+        final var handler = new SSESessionHandler(executorService, sink, sse, listener, maximumFragmentLength,
+            heartbeatInterval);
         handler.init();
     }
 }
index 26911539a926e9ea09ded7376c319311f64e2fc7..b3cffdd763f44c0048e7c9a136316825c505b12d 100644 (file)
@@ -14,16 +14,16 @@ public final class RestconfStreamsConstants {
     public static final String DATASTORE_PARAM_NAME = "datastore";
     public static final String SCOPE_PARAM_NAME = "scope";
 
+    // Prefixes for stream names
     public static final String DATA_SUBSCRIPTION = "data-change-event-subscription";
     public static final String NOTIFICATION_STREAM = "notification-stream";
+    public static final String DEVICE_NOTIFICATION_STREAM = "device-notification-stream";
 
     public static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams";
     public static final String STREAM_PATH_PART = "/stream=";
     public static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART;
     public static final String STREAM_ACCESS_PATH_PART = "/access=";
     public static final String STREAM_LOCATION_PATH_PART = "/location";
-    public static final String NOTIFICATION_TYPE = "notificationType";
-    public static final String DEVICE = "device";
 
     private RestconfStreamsConstants() {
         // Hidden on purpose
index 86ab5983e48e8bb0cd843ce3f472c0dcdec5b72e..84088d9612e48c5328d209e6d87ee1422f6d26a4 100644 (file)
@@ -104,17 +104,17 @@ public final class ListenersBroker {
     /**
      * Get listener for device path.
      *
-     * @param path name.
-     * @return {@link BaseListenerInterface} specified by stream name or {@code null} if listener with specified
-     *         stream name does not exist.
+     * @param streamName name.
+     * @return {@link DeviceNotificationListenerAdaptor} specified by stream name or {@code null} if listener with
+     *         specified stream name does not exist.
      * @throws NullPointerException in {@code path} is {@code null}
      */
-    public @Nullable BaseListenerInterface deviceNotificationListenerFor(final String path) {
-        requireNonNull(path);
+    public @Nullable DeviceNotificationListenerAdaptor deviceNotificationListenerFor(final String streamName) {
+        requireNonNull(streamName);
 
         final long stamp = deviceNotificationListenersLock.readLock();
         try {
-            return deviceNotificationListeners.get(path);
+            return deviceNotificationListeners.get(streamName);
         } finally {
             deviceNotificationListenersLock.unlockRead(stamp);
         }
@@ -132,6 +132,8 @@ public final class ListenersBroker {
             return notificationListenerFor(streamName);
         } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
             return dataChangeListenerFor(streamName);
+        } else if (streamName.startsWith(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM)) {
+            return deviceNotificationListenerFor(streamName);
         } else {
             return null;
         }
@@ -223,10 +225,13 @@ public final class ListenersBroker {
     public DeviceNotificationListenerAdaptor registerDeviceNotificationListener(final String deviceName,
             final NotificationOutputType outputType, final EffectiveModelContext refSchemaCtx,
             final DOMMountPointService mountPointService, final YangInstanceIdentifier path) {
+        final var sb = new StringBuilder(RestconfStreamsConstants.DEVICE_NOTIFICATION_STREAM).append('/')
+            .append(deviceName);
+
         final long stamp = deviceNotificationListenersLock.writeLock();
         try {
-            return deviceNotificationListeners.computeIfAbsent(deviceName,
-                streamName -> new DeviceNotificationListenerAdaptor(deviceName, outputType, refSchemaCtx,
+            return deviceNotificationListeners.computeIfAbsent(sb.toString(),
+                streamName -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
                     mountPointService, path, this));
         } finally {
             deviceNotificationListenersLock.unlockWrite(stamp);