Capture ListenersBroker instances
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / CreateStreamUtil.java
index 3c96b213ac22af8e42b08f5d4e1602bd141e467b..8e5502928b97bb28c576edbb620ae38d7bf8c42e 100644 (file)
@@ -23,7 +23,6 @@ import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
@@ -45,8 +44,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.Module;
-import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
 
@@ -111,7 +108,7 @@ final class CreateStreamUtil {
      *     </pre>
      */
     // FIXME: this really should be a normal RPC implementation
-    static DOMRpcResult createDataChangeNotifiStream(final ContainerNode input,
+    static DOMRpcResult createDataChangeNotifiStream(final ListenersBroker listenersBroker, final ContainerNode input,
             final EffectiveModelContext refSchemaCtx) {
         // parsing out of container with settings and path
         final YangInstanceIdentifier path = preparePath(input);
@@ -126,7 +123,7 @@ final class CreateStreamUtil {
         final String streamName = streamNameBuilder.toString();
 
         // registration of the listener
-        ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
+        listenersBroker.registerDataChangeListener(path, streamName, outputType);
 
         // building of output
         return new DefaultDOMRpcResult(Builders.containerBuilder()
@@ -185,7 +182,7 @@ final class CreateStreamUtil {
                 ErrorTag.OPERATION_FAILED);
         }
 
-        final DeviceNotificationListenerAdaptor notificationListenerAdapter = ListenersBroker.getInstance()
+        final DeviceNotificationListenerAdaptor notificationListenerAdapter = streamUtil.listenersBroker()
             .registerDeviceNotificationListener(deviceName, prepareOutputType(input), mountModelContext,
                 mountPointService, mountPoint.getIdentifier());
         notificationListenerAdapter.listen(mountNotifService, notificationPaths);
@@ -254,45 +251,4 @@ final class CreateStreamUtil {
         return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
             ? str : null;
     }
-
-    /**
-     * Create YANG notification stream using notification definition in YANG schema.
-     *
-     * @param notificationDefinition YANG notification definition.
-     * @param refSchemaCtx           Reference to {@link EffectiveModelContext}
-     * @param outputType             Output type (XML or JSON).
-     * @return {@link NotificationListenerAdapter}
-     */
-    static NotificationListenerAdapter createYangNotifiStream(final NotificationDefinition notificationDefinition,
-            final EffectiveModelContext refSchemaCtx, final NotificationOutputType outputType) {
-        final var streamName = parseNotificationStreamName(requireNonNull(notificationDefinition),
-                requireNonNull(refSchemaCtx), requireNonNull(outputType));
-        final var listenersBroker = ListenersBroker.getInstance();
-
-        final var existing = listenersBroker.notificationListenerFor(streamName);
-        return existing != null ? existing
-            : listenersBroker.registerNotificationListener(
-                Absolute.of(notificationDefinition.getQName()), streamName, outputType);
-    }
-
-    private static String parseNotificationStreamName(final NotificationDefinition notificationDefinition,
-            final EffectiveModelContext refSchemaCtx, final NotificationOutputType outputType) {
-        final QName notificationDefinitionQName = notificationDefinition.getQName();
-        final Module module = refSchemaCtx.findModule(
-                notificationDefinitionQName.getModule().getNamespace(),
-                notificationDefinitionQName.getModule().getRevision()).orElse(null);
-        requireNonNull(module, String.format("Module for namespace %s does not exist.",
-                notificationDefinitionQName.getModule().getNamespace()));
-
-        final StringBuilder streamNameBuilder = new StringBuilder();
-        streamNameBuilder.append(RestconfStreamsConstants.NOTIFICATION_STREAM)
-                .append('/')
-                .append(module.getName())
-                .append(':')
-                .append(notificationDefinitionQName.getLocalName());
-        if (outputType != NotificationOutputType.XML) {
-            streamNameBuilder.append('/').append(outputType.getName());
-        }
-        return streamNameBuilder.toString();
-    }
 }