X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=restconf%2Frestconf-nb%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Frestconf%2Fnb%2Frfc8040%2Frests%2Fservices%2Fimpl%2FRestconfInvokeOperationsServiceImpl.java;h=bfab58135c45149a2c1604c06c1a5ede6291b487;hb=992b87d2709c9593bcda654f4962964a74422712;hp=e8c788874755daba60fac89cd4db0d22e6adce7b;hpb=ae70c1d6c29eefe745829bccdacf8d124d8ecfbb;p=netconf.git diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java index e8c7888747..bfab58135c 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java @@ -9,165 +9,168 @@ package org.opendaylight.restconf.nb.rfc8040.rests.services.impl; import static java.util.Objects.requireNonNull; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import java.util.List; -import java.util.concurrent.ExecutionException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Optional; +import javax.ws.rs.Consumes; +import javax.ws.rs.Encoded; +import javax.ws.rs.POST; import javax.ws.rs.Path; -import javax.ws.rs.WebApplicationException; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; import javax.ws.rs.container.AsyncResponse; -import javax.ws.rs.core.Response.Status; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; -import org.eclipse.jdt.annotation.NonNull; -import org.opendaylight.mdsal.dom.api.DOMMountPoint; import org.opendaylight.mdsal.dom.api.DOMMountPointService; -import org.opendaylight.mdsal.dom.api.DOMRpcException; -import org.opendaylight.mdsal.dom.api.DOMRpcResult; -import org.opendaylight.mdsal.dom.api.DOMRpcService; -import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult; -import org.opendaylight.restconf.common.context.InstanceIdentifierContext; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; +import org.opendaylight.restconf.common.errors.RestconfFuture; +import org.opendaylight.restconf.nb.rfc8040.MediaTypes; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; +import org.opendaylight.restconf.nb.rfc8040.databind.JsonOperationInputBody; +import org.opendaylight.restconf.nb.rfc8040.databind.OperationInputBody; +import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody; +import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext; import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload; -import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfInvokeOperationsService; -import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration; +import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker; import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStream; import org.opendaylight.yangtools.yang.common.ErrorTag; import org.opendaylight.yangtools.yang.common.ErrorType; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; -import org.opendaylight.yangtools.yang.common.YangConstants; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; -import org.opendaylight.yangtools.yang.model.api.SchemaNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Implementation of {@link RestconfInvokeOperationsService}. - * + * An operation resource represents a protocol operation defined with the YANG {@code rpc} statement. It is invoked + * using a POST method on the operation resource. */ @Path("/") -public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperationsService { +public final class RestconfInvokeOperationsServiceImpl { private static final Logger LOG = LoggerFactory.getLogger(RestconfInvokeOperationsServiceImpl.class); - private final DOMRpcService rpcService; + private final DatabindProvider databindProvider; + private final MdsalRestconfServer server; + @Deprecated(forRemoval = true) private final DOMMountPointService mountPointService; - private final SubscribeToStreamUtil streamUtils; + private final ListenersBroker listenersBroker; - public RestconfInvokeOperationsServiceImpl(final DOMRpcService rpcService, - final DOMMountPointService mountPointService, final StreamsConfiguration configuration) { - this.rpcService = requireNonNull(rpcService); + public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider, + final MdsalRestconfServer server, final DOMMountPointService mountPointService, + final ListenersBroker listenersBroker) { + this.databindProvider = requireNonNull(databindProvider); + this.server = requireNonNull(server); this.mountPointService = requireNonNull(mountPointService); - streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents() - : SubscribeToStreamUtil.webSockets(); - } - - @Override - public void invokeRpc(final String identifier, final NormalizedNodePayload payload, final UriInfo uriInfo, - final AsyncResponse ar) { - final InstanceIdentifierContext context = payload.getInstanceIdentifierContext(); - final EffectiveModelContext schemaContext = context.getSchemaContext(); - final DOMMountPoint mountPoint = context.getMountPoint(); - final SchemaNode schema = context.getSchemaNode(); - final QName rpcName = schema.getQName(); - final ContainerNode rpcInput = (ContainerNode) payload.getData(); - - final ListenableFuture future; - if (mountPoint == null) { - if (CreateDataChangeEventSubscription.QNAME.equals(rpcName)) { - future = Futures.immediateFuture( - CreateStreamUtil.createDataChangeNotifiStream(rpcInput, schemaContext)); - } else if (SubscribeDeviceNotification.QNAME.equals(rpcName)) { - final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString(); - future = Futures.immediateFuture(CreateStreamUtil.createDeviceNotificationListener(baseUrl, rpcInput, - streamUtils, mountPointService)); - } else { - future = invokeRpc(rpcInput, rpcName, rpcService); - } - } else { - future = invokeRpc(rpcInput, rpcName, mountPoint); - } - - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final DOMRpcResult response) { - final var errors = response.errors(); - if (!errors.isEmpty()) { - LOG.debug("RpcError message {}", response.errors()); - ar.resume(new RestconfDocumentedException("RPCerror message ", null, response.errors())); - return; - } - - final ContainerNode resultData = response.value(); - if (resultData == null || resultData.isEmpty()) { - ar.resume(new WebApplicationException(Status.NO_CONTENT)); - } else { - ar.resume(NormalizedNodePayload.of(context, resultData)); - } - } - - @Override - public void onFailure(final Throwable failure) { - ar.resume(failure); - } - }, MoreExecutors.directExecutor()); + this.listenersBroker = requireNonNull(listenersBroker); } /** - * Invoking rpc via mount point. + * Invoke RPC operation. * - * @param mountPoint mount point - * @param data input data - * @param rpc RPC type - * @return {@link DOMRpcResult} + * @param identifier module name and rpc identifier string for the desired operation + * @param body the body of the operation + * @param uriInfo URI info + * @param ar {@link AsyncResponse} which needs to be completed with a {@link NormalizedNodePayload} output */ - @VisibleForTesting - static ListenableFuture invokeRpc(final ContainerNode data, final QName rpc, - final DOMMountPoint mountPoint) { - return invokeRpc(data, rpc, mountPoint.getService(DOMRpcService.class).orElseThrow(() -> { - final String errmsg = "RPC service is missing."; - LOG.debug(errmsg); - return new RestconfDocumentedException(errmsg); - })); + @POST + // FIXME: identifier is just a *single* QName + @Path("/operations/{identifier:.+}") + @Consumes({ + MediaTypes.APPLICATION_YANG_DATA_XML, + MediaType.APPLICATION_XML, + MediaType.TEXT_XML + }) + @Produces({ + MediaTypes.APPLICATION_YANG_DATA_JSON, + MediaTypes.APPLICATION_YANG_DATA_XML, + MediaType.APPLICATION_JSON, + MediaType.APPLICATION_XML, + MediaType.TEXT_XML + }) + public void invokeRpcXML(@Encoded @PathParam("identifier") final String identifier, final InputStream body, + @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) { + try (var xmlBody = new XmlOperationInputBody(body)) { + invokeRpc(identifier, uriInfo, ar, xmlBody); + } } /** - * Invoke rpc. + * Invoke RPC operation. * - * @param input input data - * @param rpc RPC type - * @param rpcService rpc service to invoke rpc - * @return {@link DOMRpcResult} + * @param identifier module name and rpc identifier string for the desired operation + * @param body the body of the operation + * @param uriInfo URI info + * @param ar {@link AsyncResponse} which needs to be completed with a {@link NormalizedNodePayload} output */ - @VisibleForTesting - static ListenableFuture invokeRpc(final ContainerNode input, final QName rpc, - final DOMRpcService rpcService) { - return Futures.catching(rpcService.invokeRpc(rpc, nonnullInput(rpc, input)), DOMRpcException.class, - cause -> new DefaultDOMRpcResult(List.of(RpcResultBuilder.newError(ErrorType.RPC, ErrorTag.OPERATION_FAILED, - cause.getMessage()))), - MoreExecutors.directExecutor()); + @POST + // FIXME: identifier is just a *single* QName + @Path("/operations/{identifier:.+}") + @Consumes({ + MediaTypes.APPLICATION_YANG_DATA_JSON, + MediaType.APPLICATION_JSON, + }) + @Produces({ + MediaTypes.APPLICATION_YANG_DATA_JSON, + MediaTypes.APPLICATION_YANG_DATA_XML, + MediaType.APPLICATION_JSON, + MediaType.APPLICATION_XML, + MediaType.TEXT_XML + }) + public void invokeRpcJSON(@Encoded @PathParam("identifier") final String identifier, final InputStream body, + @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) { + try (var jsonBody = new JsonOperationInputBody(body)) { + invokeRpc(identifier, uriInfo, ar, jsonBody); + } } - private static @NonNull ContainerNode nonnullInput(final QName type, final ContainerNode input) { - return input != null ? input - : ImmutableNodes.containerNode(YangConstants.operationInputQName(type.getModule())); - } + private void invokeRpc(final String identifier, final UriInfo uriInfo, final AsyncResponse ar, + final OperationInputBody body) { + final var databind = databindProvider.currentContext(); + final var reqPath = server.bindRequestPath(databind, identifier); - @Deprecated - static T checkedGet(final ListenableFuture future) { + final ContainerNode input; try { - return future.get(); - } catch (InterruptedException e) { - throw new RestconfDocumentedException("Interrupted while waiting for result of invocation", e); - } catch (ExecutionException e) { - Throwables.throwIfInstanceOf(e.getCause(), RestconfDocumentedException.class); - throw new RestconfDocumentedException("Invocation failed", e); + input = body.toContainerNode(reqPath.inference()); + } catch (IOException e) { + LOG.debug("Error reading input", e); + throw new RestconfDocumentedException("Error parsing input: " + e.getMessage(), ErrorType.PROTOCOL, + ErrorTag.MALFORMED_MESSAGE, e); } + + hackInvokeRpc(databind, reqPath, uriInfo, input).addCallback(new JaxRsRestconfCallback<>(ar) { + @Override + Response transform(final Optional result) { + return result + .filter(output -> !output.isEmpty()) + .map(output -> Response.ok().entity(new NormalizedNodePayload(reqPath.inference(), output)).build()) + .orElseGet(() -> Response.noContent().build()); + } + }); + } + + private RestconfFuture> hackInvokeRpc(final DatabindContext localDatabind, + final InstanceIdentifierContext reqPath, final UriInfo uriInfo, final ContainerNode input) { + // RPC type + final var type = reqPath.getSchemaNode().getQName(); + final var mountPoint = reqPath.getMountPoint(); + if (mountPoint == null) { + // Hacked-up integration of streams + if (CreateDataChangeEventSubscription.QNAME.equals(type)) { + return CreateStreamUtil.createDataChangeNotifiStream(listenersBroker, input, + localDatabind.modelContext()); + } else if (CreateNotificationStream.QNAME.equals(type)) { + return CreateStreamUtil.createNotificationStream(listenersBroker, input, + localDatabind.modelContext()); + } else if (SubscribeDeviceNotification.QNAME.equals(type)) { + return CreateStreamUtil.createDeviceNotificationListener(listenersBroker, input, + listenersBroker.prepareUriByStreamName(uriInfo, "").toString(), mountPointService); + } + } + + return server.getRestconfStrategy(reqPath.getSchemaContext(), mountPoint).invokeRpc(type, input); } }