Use AsyncResponse in RPC invocation path
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / RestconfInvokeOperationsServiceImpl.java
index cad9bee1db07be3ebe6c9cb09472e68e70dbe826..4c24e478dd91f44808d7576028e0599a32083521 100644 (file)
@@ -12,15 +12,16 @@ import static java.util.Objects.requireNonNull;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Optional;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import javax.ws.rs.Path;
 import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriInfo;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
@@ -46,6 +47,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,47 +73,58 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
     }
 
     @Override
-    public NormalizedNodeContext invokeRpc(final String identifier, final NormalizedNodeContext payload,
-            final UriInfo uriInfo) {
-        final QName schemaPath = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
+    public void invokeRpc(final String identifier, final NormalizedNodeContext payload, final UriInfo uriInfo,
+            final AsyncResponse ar) {
+        final SchemaNode schema = payload.getInstanceIdentifierContext().getSchemaNode();
+        final QName rpcName = schema.getQName();
         final DOMMountPoint mountPoint = payload.getInstanceIdentifierContext().getMountPoint();
-        final XMLNamespace namespace = payload.getInstanceIdentifierContext().getSchemaNode().getQName().getNamespace();
 
-        final DOMRpcResult response;
+        final ListenableFuture<? extends DOMRpcResult> future;
         final EffectiveModelContext schemaContextRef;
         if (mountPoint == null) {
             schemaContextRef = schemaContextHandler.get();
+
             // FIXME: this really should be a normal RPC invocation service which has its own interface with JAX-RS
-            if (SAL_REMOTE_NAMESPACE.equals(namespace)) {
+            if (SAL_REMOTE_NAMESPACE.equals(rpcName.getNamespace())) {
                 if (identifier.contains("create-data-change-event-subscription")) {
-                    response = CreateStreamUtil.createDataChangeNotifiStream(payload, schemaContextRef);
+                    future = Futures.immediateFuture(
+                        CreateStreamUtil.createDataChangeNotifiStream(payload, schemaContextRef));
                 } else {
-                    throw new RestconfDocumentedException("Not supported operation", ErrorType.RPC,
-                            ErrorTag.OPERATION_NOT_SUPPORTED);
+                    future = Futures.immediateFailedFuture(new RestconfDocumentedException("Unsupported operation",
+                        ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED));
                 }
             } else {
-                response = invokeRpc(payload.getData(), schemaPath, rpcService);
+                future = invokeRpc(payload.getData(), rpcName, rpcService);
             }
         } else {
-            response = invokeRpc(payload.getData(), schemaPath, mountPoint);
             schemaContextRef = modelContext(mountPoint);
+            future = invokeRpc(payload.getData(), rpcName, mountPoint);
         }
 
-        final DOMRpcResult result = checkResponse(response);
+        Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
+            @Override
+            public void onSuccess(final DOMRpcResult response) {
+                final var errors = response.getErrors();
+                if (!errors.isEmpty()) {
+                    LOG.debug("RpcError message {}", response.getErrors());
+                    ar.resume(new RestconfDocumentedException("RPCerror message ", null, response.getErrors()));
+                    return;
+                }
 
-        RpcDefinition resultNodeSchema = null;
-        NormalizedNode resultData = null;
-        if (result != null && result.getResult() != null) {
-            resultData = result.getResult();
-            resultNodeSchema = (RpcDefinition) payload.getInstanceIdentifierContext().getSchemaNode();
-        }
+                final NormalizedNode resultData = response.getResult();
+                if (resultData == null || ((ContainerNode) resultData).isEmpty()) {
+                    ar.resume(new WebApplicationException(Status.NO_CONTENT));
+                } else {
+                    ar.resume(new NormalizedNodeContext(new InstanceIdentifierContext<>(null, (RpcDefinition) schema,
+                        mountPoint, schemaContextRef), resultData));
+                }
+            }
 
-        if (resultData != null && ((ContainerNode) resultData).isEmpty()) {
-            throw new WebApplicationException(Response.Status.NO_CONTENT);
-        } else {
-            return new NormalizedNodeContext(new InstanceIdentifierContext<>(null, resultNodeSchema, mountPoint,
-                    schemaContextRef), resultData);
-        }
+            @Override
+            public void onFailure(final Throwable failure) {
+                ar.resume(failure);
+            }
+        }, MoreExecutors.directExecutor());
     }
 
     /**
@@ -122,9 +135,9 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
      * @param rpc RPC type
      * @return {@link DOMRpcResult}
      */
-    // FIXME: NETCONF-718: we should be returning a future here
     @VisibleForTesting
-    static DOMRpcResult invokeRpc(final NormalizedNode data, final QName rpc, final DOMMountPoint mountPoint) {
+    static ListenableFuture<? extends DOMRpcResult> invokeRpc(final NormalizedNode data, final QName rpc,
+            final DOMMountPoint mountPoint) {
         return invokeRpc(data, rpc, mountPoint.getService(DOMRpcService.class).orElseThrow(() -> {
             final String errmsg = "RPC service is missing.";
             LOG.debug(errmsg);
@@ -140,14 +153,14 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
      * @param rpcService rpc service to invoke rpc
      * @return {@link DOMRpcResult}
      */
-    // FIXME: NETCONF-718: we should be returning a future here
     @VisibleForTesting
-    static DOMRpcResult invokeRpc(final NormalizedNode data, final QName rpc, final DOMRpcService rpcService) {
-        return checkedGet(Futures.catching(
-            rpcService.invokeRpc(rpc, nonnullInput(rpc, data)), DOMRpcException.class,
+    static ListenableFuture<? extends DOMRpcResult> invokeRpc(final NormalizedNode data, final QName rpc,
+            final DOMRpcService rpcService) {
+        return Futures.catching(rpcService.invokeRpc(rpc, nonnullInput(rpc, data)),
+            DOMRpcException.class,
             cause -> new DefaultDOMRpcResult(ImmutableList.of(RpcResultBuilder.newError(
                 RpcError.ErrorType.RPC, "operation-failed", cause.getMessage()))),
-            MoreExecutors.directExecutor()));
+            MoreExecutors.directExecutor());
     }
 
     private static @NonNull NormalizedNode nonnullInput(final QName type, final NormalizedNode input) {
@@ -155,30 +168,6 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
                 : ImmutableNodes.containerNode(YangConstants.operationInputQName(type.getModule()));
     }
 
-    /**
-     * Check the validity of the result.
-     *
-     * @param response response of rpc
-     * @return {@link DOMRpcResult} result
-     */
-    @VisibleForTesting
-    static DOMRpcResult checkResponse(final DOMRpcResult response) {
-        if (response == null) {
-            return null;
-        }
-        try {
-            if (response.getErrors().isEmpty()) {
-                return response;
-            }
-            LOG.debug("RpcError message {}", response.getErrors());
-            throw new RestconfDocumentedException("RPCerror message ", null, response.getErrors());
-        } catch (final CancellationException e) {
-            final String errMsg = "The operation was cancelled while executing.";
-            LOG.debug("Cancel RpcExecution: {}", errMsg, e);
-            throw new RestconfDocumentedException(errMsg, ErrorType.RPC, ErrorTag.PARTIAL_OPERATION, e);
-        }
-    }
-
     @Deprecated
     static <T> T checkedGet(final ListenableFuture<T> future) {
         try {