Turn ListenersBroker into a component
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / RestconfInvokeOperationsServiceImpl.java
index 0fc3d26c115d91a00c54cb97031880f8e682befa..03fbe8e93bd202817378848ab5c66b2f99491b10 100644 (file)
@@ -39,6 +39,7 @@ import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody;
 import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
@@ -60,15 +61,17 @@ public final class RestconfInvokeOperationsServiceImpl {
     @Deprecated(forRemoval = true)
     private final DOMMountPointService mountPointService;
     private final SubscribeToStreamUtil streamUtils;
+    private final ListenersBroker listenersBroker;
 
     public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
             final MdsalRestconfServer server, final DOMMountPointService mountPointService,
-            final StreamsConfiguration configuration) {
+            final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
         this.databindProvider = requireNonNull(databindProvider);
         this.server = requireNonNull(server);
         this.mountPointService = requireNonNull(mountPointService);
-        streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
-            : SubscribeToStreamUtil.webSockets();
+        this.listenersBroker = requireNonNull(listenersBroker);
+        streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
+            : SubscribeToStreamUtil.webSockets(listenersBroker);
     }
 
     /**
@@ -177,7 +180,7 @@ public final class RestconfInvokeOperationsServiceImpl {
             } else if (SubscribeDeviceNotification.QNAME.equals(type)) {
                 final var baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
                 return RestconfFuture.of(Optional.of(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
-                    streamUtils, mountPointService)));
+                    streamUtils, mountPointService, listenersBroker)));
             }
         }