import static java.util.Objects.requireNonNull;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.POST;
import javax.ws.rs.Path;
-import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.api.DOMMountPoint;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
-import org.opendaylight.mdsal.dom.api.DOMRpcException;
-import org.opendaylight.mdsal.dom.api.DOMRpcResult;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
-import org.opendaylight.restconf.common.context.InstanceIdentifierContext;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.common.errors.RestconfFuture;
+import org.opendaylight.restconf.nb.rfc8040.MediaTypes;
+import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
+import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
+import org.opendaylight.restconf.nb.rfc8040.databind.JsonOperationInputBody;
+import org.opendaylight.restconf.nb.rfc8040.databind.OperationInputBody;
+import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody;
+import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfInvokeOperationsService;
-import org.opendaylight.restconf.nb.rfc8040.streams.Configuration;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
+import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStream;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.QNameModule;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.common.YangConstants;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Implementation of {@link RestconfInvokeOperationsService}.
- *
+ * An operation resource represents a protocol operation defined with the YANG {@code rpc} statement. It is invoked
+ * using a POST method on the operation resource.
*/
@Path("/")
-public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperationsService {
+public final class RestconfInvokeOperationsServiceImpl {
private static final Logger LOG = LoggerFactory.getLogger(RestconfInvokeOperationsServiceImpl.class);
- // FIXME: at some point we do not want to have this here, as this is only used for dispatch
- private static final QNameModule SAL_REMOTE_NAMESPACE = CreateDataChangeEventSubscriptionInput.QNAME.getModule();
- private static final QNameModule DEVICE_NOTIFICATION_NAMESPACE = SubscribeDeviceNotificationInput.QNAME.getModule();
-
- private final DOMRpcService rpcService;
+ private final DatabindProvider databindProvider;
+ private final MdsalRestconfServer server;
+ @Deprecated(forRemoval = true)
private final DOMMountPointService mountPointService;
- private final SubscribeToStreamUtil streamUtils;
+ private final ListenersBroker listenersBroker;
- public RestconfInvokeOperationsServiceImpl(final DOMRpcService rpcService,
- final DOMMountPointService mountPointService, final Configuration configuration) {
- this.rpcService = requireNonNull(rpcService);
+ public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
+ final MdsalRestconfServer server, final DOMMountPointService mountPointService,
+ final ListenersBroker listenersBroker) {
+ this.databindProvider = requireNonNull(databindProvider);
+ this.server = requireNonNull(server);
this.mountPointService = requireNonNull(mountPointService);
- streamUtils = configuration.isUseSSE() ? SubscribeToStreamUtil.serverSentEvents()
- : SubscribeToStreamUtil.webSockets();
- }
-
- @Override
- public void invokeRpc(final String identifier, final NormalizedNodePayload payload, final UriInfo uriInfo,
- final AsyncResponse ar) {
- final InstanceIdentifierContext context = payload.getInstanceIdentifierContext();
- final EffectiveModelContext schemaContext = context.getSchemaContext();
- final DOMMountPoint mountPoint = context.getMountPoint();
- final SchemaNode schema = context.getSchemaNode();
- final QName rpcName = schema.getQName();
-
- final ListenableFuture<? extends DOMRpcResult> future;
- if (mountPoint == null) {
- // FIXME: this really should be a normal RPC invocation service which has its own interface with JAX-RS,
- // except ... we check 'identifier' for .contains() instead of exact RPC name!
- if (SAL_REMOTE_NAMESPACE.equals(rpcName.getModule())) {
- if (identifier.contains("create-data-change-event-subscription")) {
- future = Futures.immediateFuture(
- CreateStreamUtil.createDataChangeNotifiStream(payload, schemaContext));
- } else {
- future = Futures.immediateFailedFuture(new RestconfDocumentedException("Unsupported operation",
- ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED));
- }
- } else if (DEVICE_NOTIFICATION_NAMESPACE.equals(rpcName.getModule())) {
- // FIXME: this should be a match on RPC QName
- final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
- future = Futures.immediateFuture(CreateStreamUtil.createDeviceNotificationListener(baseUrl, payload,
- streamUtils, mountPointService));
- } else {
- future = invokeRpc((ContainerNode)payload.getData(), rpcName, rpcService);
- }
- } else {
- future = invokeRpc(payload.getData(), rpcName, mountPoint);
- }
-
- Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
- @Override
- public void onSuccess(final DOMRpcResult response) {
- final var errors = response.getErrors();
- if (!errors.isEmpty()) {
- LOG.debug("RpcError message {}", response.getErrors());
- ar.resume(new RestconfDocumentedException("RPCerror message ", null, response.getErrors()));
- return;
- }
-
- final NormalizedNode resultData = response.getResult();
- if (resultData == null || ((ContainerNode) resultData).isEmpty()) {
- ar.resume(new WebApplicationException(Status.NO_CONTENT));
- } else {
- ar.resume(NormalizedNodePayload.of(context, resultData));
- }
- }
-
- @Override
- public void onFailure(final Throwable failure) {
- ar.resume(failure);
- }
- }, MoreExecutors.directExecutor());
+ this.listenersBroker = requireNonNull(listenersBroker);
}
/**
- * Invoking rpc via mount point.
+ * Invoke RPC operation.
*
- * @param mountPoint mount point
- * @param data input data
- * @param rpc RPC type
- * @return {@link DOMRpcResult}
+ * @param identifier module name and rpc identifier string for the desired operation
+ * @param body the body of the operation
+ * @param uriInfo URI info
+ * @param ar {@link AsyncResponse} which needs to be completed with a {@link NormalizedNodePayload} output
*/
- @VisibleForTesting
- static ListenableFuture<? extends DOMRpcResult> invokeRpc(final NormalizedNode data, final QName rpc,
- final DOMMountPoint mountPoint) {
- return invokeRpc((ContainerNode) data, rpc, mountPoint.getService(DOMRpcService.class).orElseThrow(() -> {
- final String errmsg = "RPC service is missing.";
- LOG.debug(errmsg);
- return new RestconfDocumentedException(errmsg);
- }));
+ @POST
+ // FIXME: identifier is just a *single* QName
+ @Path("/operations/{identifier:.+}")
+ @Consumes({
+ MediaTypes.APPLICATION_YANG_DATA_XML,
+ MediaType.APPLICATION_XML,
+ MediaType.TEXT_XML
+ })
+ @Produces({
+ MediaTypes.APPLICATION_YANG_DATA_JSON,
+ MediaTypes.APPLICATION_YANG_DATA_XML,
+ MediaType.APPLICATION_JSON,
+ MediaType.APPLICATION_XML,
+ MediaType.TEXT_XML
+ })
+ public void invokeRpcXML(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
+ @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) {
+ try (var xmlBody = new XmlOperationInputBody(body)) {
+ invokeRpc(identifier, uriInfo, ar, xmlBody);
+ }
}
/**
- * Invoke rpc.
+ * Invoke RPC operation.
*
- * @param data input data
- * @param rpc RPC type
- * @param rpcService rpc service to invoke rpc
- * @return {@link DOMRpcResult}
+ * @param identifier module name and rpc identifier string for the desired operation
+ * @param body the body of the operation
+ * @param uriInfo URI info
+ * @param ar {@link AsyncResponse} which needs to be completed with a {@link NormalizedNodePayload} output
*/
- @VisibleForTesting
- static ListenableFuture<? extends DOMRpcResult> invokeRpc(final ContainerNode data, final QName rpc,
- final DOMRpcService rpcService) {
- return Futures.catching(rpcService.invokeRpc(rpc, nonnullInput(rpc, data)),
- DOMRpcException.class,
- cause -> new DefaultDOMRpcResult(List.of(RpcResultBuilder.newError(ErrorType.RPC, ErrorTag.OPERATION_FAILED,
- cause.getMessage()))),
- MoreExecutors.directExecutor());
+ @POST
+ // FIXME: identifier is just a *single* QName
+ @Path("/operations/{identifier:.+}")
+ @Consumes({
+ MediaTypes.APPLICATION_YANG_DATA_JSON,
+ MediaType.APPLICATION_JSON,
+ })
+ @Produces({
+ MediaTypes.APPLICATION_YANG_DATA_JSON,
+ MediaTypes.APPLICATION_YANG_DATA_XML,
+ MediaType.APPLICATION_JSON,
+ MediaType.APPLICATION_XML,
+ MediaType.TEXT_XML
+ })
+ public void invokeRpcJSON(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
+ @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) {
+ try (var jsonBody = new JsonOperationInputBody(body)) {
+ invokeRpc(identifier, uriInfo, ar, jsonBody);
+ }
}
- private static @NonNull ContainerNode nonnullInput(final QName type, final ContainerNode input) {
- return input != null ? input
- : ImmutableNodes.containerNode(YangConstants.operationInputQName(type.getModule()));
- }
+ private void invokeRpc(final String identifier, final UriInfo uriInfo, final AsyncResponse ar,
+ final OperationInputBody body) {
+ final var databind = databindProvider.currentContext();
+ final var reqPath = server.bindRequestPath(databind, identifier);
- @Deprecated
- static <T> T checkedGet(final ListenableFuture<T> future) {
+ final ContainerNode input;
try {
- return future.get();
- } catch (InterruptedException e) {
- throw new RestconfDocumentedException("Interrupted while waiting for result of invocation", e);
- } catch (ExecutionException e) {
- Throwables.throwIfInstanceOf(e.getCause(), RestconfDocumentedException.class);
- throw new RestconfDocumentedException("Invocation failed", e);
+ input = body.toContainerNode(reqPath.inference());
+ } catch (IOException e) {
+ LOG.debug("Error reading input", e);
+ throw new RestconfDocumentedException("Error parsing input: " + e.getMessage(), ErrorType.PROTOCOL,
+ ErrorTag.MALFORMED_MESSAGE, e);
}
+
+ hackInvokeRpc(databind, reqPath, uriInfo, input).addCallback(new JaxRsRestconfCallback<>(ar) {
+ @Override
+ Response transform(final Optional<ContainerNode> result) {
+ return result
+ .filter(output -> !output.isEmpty())
+ .map(output -> Response.ok().entity(new NormalizedNodePayload(reqPath.inference(), output)).build())
+ .orElseGet(() -> Response.noContent().build());
+ }
+ });
+ }
+
+ private RestconfFuture<Optional<ContainerNode>> hackInvokeRpc(final DatabindContext localDatabind,
+ final InstanceIdentifierContext reqPath, final UriInfo uriInfo, final ContainerNode input) {
+ // RPC type
+ final var type = reqPath.getSchemaNode().getQName();
+ final var mountPoint = reqPath.getMountPoint();
+ if (mountPoint == null) {
+ // Hacked-up integration of streams
+ if (CreateDataChangeEventSubscription.QNAME.equals(type)) {
+ return CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, input,
+ localDatabind.modelContext());
+ } else if (CreateNotificationStream.QNAME.equals(type)) {
+ return CreateStreamUtil.createNotificationStream(listenersBroker, input,
+ localDatabind.modelContext());
+ } else if (SubscribeDeviceNotification.QNAME.equals(type)) {
+ return CreateStreamUtil.createDeviceNotificationListener(listenersBroker, input,
+ listenersBroker.prepareUriByStreamName(uriInfo, "").toString(), mountPointService);
+ }
+ }
+
+ return server.getRestconfStrategy(reqPath.getSchemaContext(), mountPoint).invokeRpc(type, input);
}
}