import javax.ws.rs.core.UriInfo;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.mdsal.dom.api.DOMMountPoint;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.mdsal.dom.api.DOMRpcException;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.mdsal.dom.api.DOMRpcService;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfInvokeOperationsService;
+import org.opendaylight.restconf.nb.rfc8040.streams.Configuration;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
// FIXME: at some point we do not want to have this here, as this is only used for dispatch
private static final QNameModule SAL_REMOTE_NAMESPACE = CreateDataChangeEventSubscriptionInput.QNAME.getModule();
+ private static final QNameModule DEVICE_NOTIFICATION_NAMESPACE = SubscribeDeviceNotificationInput.QNAME.getModule();
private final DOMRpcService rpcService;
+ private final DOMMountPointService mountPointService;
+ private final SubscribeToStreamUtil streamUtils;
- public RestconfInvokeOperationsServiceImpl(final DOMRpcService rpcService) {
+ public RestconfInvokeOperationsServiceImpl(final DOMRpcService rpcService,
+ final DOMMountPointService mountPointService, final Configuration configuration) {
this.rpcService = requireNonNull(rpcService);
+ this.mountPointService = requireNonNull(mountPointService);
+ streamUtils = configuration.isUseSSE() ? SubscribeToStreamUtil.serverSentEvents()
+ : SubscribeToStreamUtil.webSockets();
}
@Override
future = Futures.immediateFailedFuture(new RestconfDocumentedException("Unsupported operation",
ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED));
}
+ } else if (DEVICE_NOTIFICATION_NAMESPACE.equals(rpcName.getModule())) {
+ // FIXME: this should be a match on RPC QName
+ final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
+ future = Futures.immediateFuture(
+ CreateStreamUtil.createDeviceNotificationListener(baseUrl, payload, schemaContext, streamUtils,
+ mountPointService));
} else {
future = invokeRpc((ContainerNode)payload.getData(), rpcName, rpcService);
}