Introduce restconf.server.{api,spi,mdsal}
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / RestconfInvokeOperationsServiceImpl.java
index e8b28f32521fd0866b34bc10381e30f6f9b6a088..9819b1b675ca9a9910d80edb5ec078dbdf452e8e 100644 (file)
@@ -9,179 +9,107 @@ package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
 
 import static java.util.Objects.requireNonNull;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.io.InputStream;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
-import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.api.DOMMountPoint;
-import org.opendaylight.mdsal.dom.api.DOMMountPointService;
-import org.opendaylight.mdsal.dom.api.DOMRpcException;
-import org.opendaylight.mdsal.dom.api.DOMRpcResult;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
-import org.opendaylight.restconf.common.context.InstanceIdentifierContext;
-import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.nb.rfc8040.MediaTypes;
+import org.opendaylight.restconf.nb.rfc8040.databind.JsonOperationInputBody;
+import org.opendaylight.restconf.nb.rfc8040.databind.OperationInputBody;
+import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
-import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfInvokeOperationsService;
-import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
-import org.opendaylight.yangtools.yang.common.ErrorTag;
-import org.opendaylight.yangtools.yang.common.ErrorType;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.QNameModule;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.common.YangConstants;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opendaylight.restconf.server.spi.OperationOutput;
 
 /**
- * Implementation of {@link RestconfInvokeOperationsService}.
- *
+ * An operation resource represents a protocol operation defined with the YANG {@code rpc} statement. It is invoked
+ * using a POST method on the operation resource.
  */
 @Path("/")
-public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperationsService {
-    private static final Logger LOG = LoggerFactory.getLogger(RestconfInvokeOperationsServiceImpl.class);
-
-    // FIXME: at some point we do not want to have this here, as this is only used for dispatch
-    private static final QNameModule SAL_REMOTE_NAMESPACE = CreateDataChangeEventSubscriptionInput.QNAME.getModule();
-    private static final QNameModule DEVICE_NOTIFICATION_NAMESPACE = SubscribeDeviceNotificationInput.QNAME.getModule();
-
-    private final DOMRpcService rpcService;
-    private final DOMMountPointService mountPointService;
-    private final SubscribeToStreamUtil streamUtils;
+public final class RestconfInvokeOperationsServiceImpl {
+    private final MdsalRestconfServer server;
 
-    public RestconfInvokeOperationsServiceImpl(final DOMRpcService rpcService,
-            final DOMMountPointService mountPointService, final StreamsConfiguration configuration) {
-        this.rpcService = requireNonNull(rpcService);
-        this.mountPointService = requireNonNull(mountPointService);
-        streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
-            : SubscribeToStreamUtil.webSockets();
-    }
-
-    @Override
-    public void invokeRpc(final String identifier, final NormalizedNodePayload payload, final UriInfo uriInfo,
-            final AsyncResponse ar) {
-        final InstanceIdentifierContext context = payload.getInstanceIdentifierContext();
-        final EffectiveModelContext schemaContext = context.getSchemaContext();
-        final DOMMountPoint mountPoint = context.getMountPoint();
-        final SchemaNode schema = context.getSchemaNode();
-        final QName rpcName = schema.getQName();
-
-        final ListenableFuture<? extends DOMRpcResult> future;
-        if (mountPoint == null) {
-            // FIXME: this really should be a normal RPC invocation service which has its own interface with JAX-RS,
-            //        except ... we check 'identifier' for .contains() instead of exact RPC name!
-            if (SAL_REMOTE_NAMESPACE.equals(rpcName.getModule())) {
-                if (identifier.contains("create-data-change-event-subscription")) {
-                    future = Futures.immediateFuture(
-                        CreateStreamUtil.createDataChangeNotifiStream(payload, schemaContext));
-                } else {
-                    future = Futures.immediateFailedFuture(new RestconfDocumentedException("Unsupported operation",
-                        ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED));
-                }
-            } else if (DEVICE_NOTIFICATION_NAMESPACE.equals(rpcName.getModule())) {
-                // FIXME: this should be a match on RPC QName
-                final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
-                future = Futures.immediateFuture(CreateStreamUtil.createDeviceNotificationListener(baseUrl, payload,
-                    streamUtils, mountPointService));
-            } else {
-                future = invokeRpc((ContainerNode)payload.getData(), rpcName, rpcService);
-            }
-        } else {
-            future = invokeRpc(payload.getData(), rpcName, mountPoint);
-        }
-
-        Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
-            @Override
-            public void onSuccess(final DOMRpcResult response) {
-                final var errors = response.getErrors();
-                if (!errors.isEmpty()) {
-                    LOG.debug("RpcError message {}", response.getErrors());
-                    ar.resume(new RestconfDocumentedException("RPCerror message ", null, response.getErrors()));
-                    return;
-                }
-
-                final NormalizedNode resultData = response.getResult();
-                if (resultData == null || ((ContainerNode) resultData).isEmpty()) {
-                    ar.resume(new WebApplicationException(Status.NO_CONTENT));
-                } else {
-                    ar.resume(NormalizedNodePayload.of(context, resultData));
-                }
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                ar.resume(failure);
-            }
-        }, MoreExecutors.directExecutor());
+    public RestconfInvokeOperationsServiceImpl(final MdsalRestconfServer server) {
+        this.server = requireNonNull(server);
     }
 
     /**
-     * Invoking rpc via mount point.
+     * Invoke RPC operation.
      *
-     * @param mountPoint mount point
-     * @param data input data
-     * @param rpc RPC type
-     * @return {@link DOMRpcResult}
+     * @param identifier module name and rpc identifier string for the desired operation
+     * @param body the body of the operation
+     * @param uriInfo URI info
+     * @param ar {@link AsyncResponse} which needs to be completed with a {@link NormalizedNodePayload} output
      */
-    @VisibleForTesting
-    static ListenableFuture<? extends DOMRpcResult> invokeRpc(final NormalizedNode data, final QName rpc,
-            final DOMMountPoint mountPoint) {
-        return invokeRpc((ContainerNode) data, rpc, mountPoint.getService(DOMRpcService.class).orElseThrow(() -> {
-            final String errmsg = "RPC service is missing.";
-            LOG.debug(errmsg);
-            return new RestconfDocumentedException(errmsg);
-        }));
+    @POST
+    // FIXME: identifier is just a *single* QName
+    @Path("/operations/{identifier:.+}")
+    @Consumes({
+        MediaTypes.APPLICATION_YANG_DATA_XML,
+        MediaType.APPLICATION_XML,
+        MediaType.TEXT_XML
+    })
+    @Produces({
+        MediaTypes.APPLICATION_YANG_DATA_JSON,
+        MediaTypes.APPLICATION_YANG_DATA_XML,
+        MediaType.APPLICATION_JSON,
+        MediaType.APPLICATION_XML,
+        MediaType.TEXT_XML
+    })
+    public void invokeRpcXML(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
+            @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) {
+        try (var xmlBody = new XmlOperationInputBody(body)) {
+            invokeRpc(identifier, uriInfo, ar, xmlBody);
+        }
     }
 
     /**
-     * Invoke rpc.
+     * Invoke RPC operation.
      *
-     * @param data input data
-     * @param rpc RPC type
-     * @param rpcService rpc service to invoke rpc
-     * @return {@link DOMRpcResult}
+     * @param identifier module name and rpc identifier string for the desired operation
+     * @param body the body of the operation
+     * @param uriInfo URI info
+     * @param ar {@link AsyncResponse} which needs to be completed with a {@link NormalizedNodePayload} output
      */
-    @VisibleForTesting
-    static ListenableFuture<? extends DOMRpcResult> invokeRpc(final ContainerNode data, final QName rpc,
-            final DOMRpcService rpcService) {
-        return Futures.catching(rpcService.invokeRpc(rpc, nonnullInput(rpc, data)),
-            DOMRpcException.class,
-            cause -> new DefaultDOMRpcResult(List.of(RpcResultBuilder.newError(ErrorType.RPC, ErrorTag.OPERATION_FAILED,
-                cause.getMessage()))),
-            MoreExecutors.directExecutor());
-    }
-
-    private static @NonNull ContainerNode nonnullInput(final QName type, final ContainerNode input) {
-        return input != null ? input
-                : ImmutableNodes.containerNode(YangConstants.operationInputQName(type.getModule()));
+    @POST
+    // FIXME: identifier is just a *single* QName
+    @Path("/operations/{identifier:.+}")
+    @Consumes({
+        MediaTypes.APPLICATION_YANG_DATA_JSON,
+        MediaType.APPLICATION_JSON,
+    })
+    @Produces({
+        MediaTypes.APPLICATION_YANG_DATA_JSON,
+        MediaTypes.APPLICATION_YANG_DATA_XML,
+        MediaType.APPLICATION_JSON,
+        MediaType.APPLICATION_XML,
+        MediaType.TEXT_XML
+    })
+    public void invokeRpcJSON(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
+            @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) {
+        try (var jsonBody = new JsonOperationInputBody(body)) {
+            invokeRpc(identifier, uriInfo, ar, jsonBody);
+        }
     }
 
-    @Deprecated
-    static <T> T checkedGet(final ListenableFuture<T> future) {
-        try {
-            return future.get();
-        } catch (InterruptedException e) {
-            throw new RestconfDocumentedException("Interrupted while waiting for result of invocation", e);
-        } catch (ExecutionException e) {
-            Throwables.throwIfInstanceOf(e.getCause(), RestconfDocumentedException.class);
-            throw new RestconfDocumentedException("Invocation failed", e);
-        }
+    private void invokeRpc(final String identifier, final UriInfo uriInfo, final AsyncResponse ar,
+            final OperationInputBody body) {
+        server.invokeRpc(uriInfo.getBaseUri(), identifier, body)
+            .addCallback(new JaxRsRestconfCallback<OperationOutput>(ar) {
+                @Override
+                Response transform(final OperationOutput result) {
+                    final var body = result.output();
+                    return body == null ? Response.noContent().build()
+                        : Response.ok().entity(new NormalizedNodePayload(result.operation(), body)).build();
+                }
+            });
     }
 }