import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
import org.opendaylight.yangtools.yang.common.ErrorTag;
@Deprecated(forRemoval = true)
private final DOMMountPointService mountPointService;
private final SubscribeToStreamUtil streamUtils;
+ private final ListenersBroker listenersBroker;
public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
final MdsalRestconfServer server, final DOMMountPointService mountPointService,
- final StreamsConfiguration configuration) {
+ final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
this.databindProvider = requireNonNull(databindProvider);
this.server = requireNonNull(server);
this.mountPointService = requireNonNull(mountPointService);
- streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
- : SubscribeToStreamUtil.webSockets();
+ this.listenersBroker = requireNonNull(listenersBroker);
+ streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
+ : SubscribeToStreamUtil.webSockets(listenersBroker);
}
/**
} else if (SubscribeDeviceNotification.QNAME.equals(type)) {
final var baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
return RestconfFuture.of(Optional.of(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
- streamUtils, mountPointService)));
+ streamUtils, mountPointService, listenersBroker)));
}
}