Create MdsalRestconfServer
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / RestconfInvokeOperationsServiceImpl.java
index cbd309685573490555203d39dd176db939bad968..0fc3d26c115d91a00c54cb97031880f8e682befa 100644 (file)
@@ -9,14 +9,12 @@ package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
 
 import static java.util.Objects.requireNonNull;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.List;
+import java.util.Optional;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.POST;
@@ -29,31 +27,23 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.api.DOMMountPoint;
 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
-import org.opendaylight.mdsal.dom.api.DOMRpcException;
-import org.opendaylight.mdsal.dom.api.DOMRpcResult;
-import org.opendaylight.mdsal.dom.api.DOMRpcService;
-import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.common.errors.RestconfFuture;
 import org.opendaylight.restconf.nb.rfc8040.MediaTypes;
+import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
 import org.opendaylight.restconf.nb.rfc8040.databind.JsonOperationInputBody;
 import org.opendaylight.restconf.nb.rfc8040.databind.OperationInputBody;
 import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody;
+import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
-import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
 import org.opendaylight.yangtools.yang.common.ErrorTag;
 import org.opendaylight.yangtools.yang.common.ErrorType;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.common.YangConstants;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,14 +56,16 @@ public final class RestconfInvokeOperationsServiceImpl {
     private static final Logger LOG = LoggerFactory.getLogger(RestconfInvokeOperationsServiceImpl.class);
 
     private final DatabindProvider databindProvider;
-    private final DOMRpcService rpcService;
+    private final MdsalRestconfServer server;
+    @Deprecated(forRemoval = true)
     private final DOMMountPointService mountPointService;
     private final SubscribeToStreamUtil streamUtils;
 
-    public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider, final DOMRpcService rpcService,
-            final DOMMountPointService mountPointService, final StreamsConfiguration configuration) {
+    public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
+            final MdsalRestconfServer server, final DOMMountPointService mountPointService,
+            final StreamsConfiguration configuration) {
         this.databindProvider = requireNonNull(databindProvider);
-        this.rpcService = requireNonNull(rpcService);
+        this.server = requireNonNull(server);
         this.mountPointService = requireNonNull(mountPointService);
         streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
             : SubscribeToStreamUtil.webSockets();
@@ -140,53 +132,29 @@ public final class RestconfInvokeOperationsServiceImpl {
 
     private void invokeRpc(final String identifier, final UriInfo uriInfo, final AsyncResponse ar,
             final OperationInputBody body) {
-        final var dataBind = databindProvider.currentContext();
-        final var schemaContext = dataBind.modelContext();
-        final var context = ParserIdentifier.toInstanceIdentifier(identifier, schemaContext, mountPointService);
+        final var databind = databindProvider.currentContext();
+        final var reqPath = server.bindRequestPath(databind, identifier);
 
         final ContainerNode input;
         try {
-            input = body.toContainerNode(context.inference());
+            input = body.toContainerNode(reqPath.inference());
         } catch (IOException e) {
             LOG.debug("Error reading input", e);
             throw new RestconfDocumentedException("Error parsing input: " + e.getMessage(), ErrorType.PROTOCOL,
                     ErrorTag.MALFORMED_MESSAGE, e);
         }
-        final var rpcName = context.getSchemaNode().getQName();
 
-        final ListenableFuture<? extends DOMRpcResult> future;
-        final var mountPoint = context.getMountPoint();
-        if (mountPoint == null) {
-            if (CreateDataChangeEventSubscription.QNAME.equals(rpcName)) {
-                future = Futures.immediateFuture(CreateStreamUtil.createDataChangeNotifiStream(
-                    streamUtils.listenersBroker(), input, schemaContext));
-            } else if (SubscribeDeviceNotification.QNAME.equals(rpcName)) {
-                final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
-                future = Futures.immediateFuture(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
-                    streamUtils, mountPointService));
-            } else {
-                future = invokeRpc(input, rpcName, rpcService);
-            }
-        } else {
-            future = invokeRpc(input, rpcName, mountPoint);
-        }
-
-        Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
+        Futures.addCallback(hackInvokeRpc(databind, reqPath, uriInfo, input), new FutureCallback<>() {
             @Override
-            public void onSuccess(final DOMRpcResult response) {
-                final var errors = response.errors();
-                if (!errors.isEmpty()) {
-                    LOG.debug("RpcError message {}", response.errors());
-                    ar.resume(new RestconfDocumentedException("RPCerror message ", null, response.errors()));
-                    return;
-                }
-
-                final ContainerNode resultData = response.value();
-                if (resultData == null || resultData.isEmpty()) {
-                    ar.resume(Response.noContent().build());
-                } else {
-                    ar.resume(Response.ok().entity(new NormalizedNodePayload(context.inference(), resultData)).build());
+            public void onSuccess(final Optional<ContainerNode> result) {
+                if (result.isPresent()) {
+                    final var output = result.orElseThrow();
+                    if (!output.isEmpty()) {
+                        ar.resume(Response.ok().entity(new NormalizedNodePayload(reqPath.inference(), output)).build());
+                        return;
+                    }
                 }
+                ar.resume(Response.noContent().build());
             }
 
             @Override
@@ -196,43 +164,23 @@ public final class RestconfInvokeOperationsServiceImpl {
         }, MoreExecutors.directExecutor());
     }
 
-    /**
-     * Invoking rpc via mount point.
-     *
-     * @param mountPoint mount point
-     * @param data input data
-     * @param rpc RPC type
-     * @return {@link DOMRpcResult}
-     */
-    @VisibleForTesting
-    static ListenableFuture<? extends DOMRpcResult> invokeRpc(final ContainerNode data, final QName rpc,
-            final DOMMountPoint mountPoint) {
-        return invokeRpc(data, rpc, mountPoint.getService(DOMRpcService.class).orElseThrow(() -> {
-            final String errmsg = "RPC service is missing.";
-            LOG.debug(errmsg);
-            return new RestconfDocumentedException(errmsg);
-        }));
-    }
-
-    /**
-     * Invoke rpc.
-     *
-     * @param input input data
-     * @param rpc RPC type
-     * @param rpcService rpc service to invoke rpc
-     * @return {@link DOMRpcResult}
-     */
-    @VisibleForTesting
-    static ListenableFuture<? extends DOMRpcResult> invokeRpc(final ContainerNode input, final QName rpc,
-            final DOMRpcService rpcService) {
-        return Futures.catching(rpcService.invokeRpc(rpc, nonnullInput(rpc, input)), DOMRpcException.class,
-            cause -> new DefaultDOMRpcResult(List.of(RpcResultBuilder.newError(ErrorType.RPC, ErrorTag.OPERATION_FAILED,
-                cause.getMessage()))),
-            MoreExecutors.directExecutor());
-    }
+    private RestconfFuture<Optional<ContainerNode>> hackInvokeRpc(final DatabindContext localDatabind,
+            final InstanceIdentifierContext reqPath, final UriInfo uriInfo, final ContainerNode input) {
+        // RPC type
+        final var type = reqPath.getSchemaNode().getQName();
+        final var mountPoint = reqPath.getMountPoint();
+        if (mountPoint == null) {
+            // Hacked-up integration of streams
+            if (CreateDataChangeEventSubscription.QNAME.equals(type)) {
+                return RestconfFuture.of(Optional.of(CreateStreamUtil.createDataChangeNotifiStream(
+                    streamUtils.listenersBroker(), input, localDatabind.modelContext())));
+            } else if (SubscribeDeviceNotification.QNAME.equals(type)) {
+                final var baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
+                return RestconfFuture.of(Optional.of(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
+                    streamUtils, mountPointService)));
+            }
+        }
 
-    private static @NonNull ContainerNode nonnullInput(final QName type, final ContainerNode input) {
-        return input != null ? input
-                : ImmutableNodes.containerNode(YangConstants.operationInputQName(type.getModule()));
+        return server.getRestconfStrategy(reqPath.getSchemaContext(), mountPoint).invokeRpc(type, input);
     }
 }