Use AsyncResponse in RPC invocation path 88/97488/11
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 14 Sep 2021 09:12:18 +0000 (11:12 +0200)
committerTomas Cere <tomas.cere@pantheon.tech>
Wed, 6 Oct 2021 12:58:01 +0000 (12:58 +0000)
RPC invocation is isolated and is inherently asynchronous. Convert it to
use a @Suspended AsyncResponse.

JIRA: NETCONF-718
Change-Id: I5ba174bd8d69e4e54a085a2f4feac076f07d59ff
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/api/RestconfInvokeOperationsService.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/web/WebInitializer.java
restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java

index 0d5cde11c34157fa672bcc1aae627785c0cc7f53..e0db6a0f5ef8b943b2f05e525cc6e05ab6062671 100644 (file)
@@ -13,6 +13,8 @@ import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.UriInfo;
@@ -30,7 +32,7 @@ public interface RestconfInvokeOperationsService {
      * @param identifier module name and rpc identifier string for the desired operation
      * @param payload {@link NormalizedNodeContext} - the body of the operation
      * @param uriInfo URI info
-     * @return {@link NormalizedNodeContext}
+     * @param ar {@link AsyncResponse} which needs to be completed with a {@link NormalizedNodeContext} ouput
      */
     @POST
     @Path("/operations/{identifier:.+}")
@@ -48,6 +50,6 @@ public interface RestconfInvokeOperationsService {
         MediaType.APPLICATION_XML,
         MediaType.TEXT_XML
     })
-    NormalizedNodeContext invokeRpc(@Encoded @PathParam("identifier") String identifier,
-            NormalizedNodeContext payload, @Context UriInfo uriInfo);
+    void invokeRpc(@Encoded @PathParam("identifier") String identifier, NormalizedNodeContext payload,
+        @Context UriInfo uriInfo, @Suspended AsyncResponse ar);
 }
index cad9bee1db07be3ebe6c9cb09472e68e70dbe826..4c24e478dd91f44808d7576028e0599a32083521 100644 (file)
@@ -12,15 +12,16 @@ import static java.util.Objects.requireNonNull;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Optional;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import javax.ws.rs.Path;
 import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriInfo;
 import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
@@ -46,6 +47,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,47 +73,58 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
     }
 
     @Override
-    public NormalizedNodeContext invokeRpc(final String identifier, final NormalizedNodeContext payload,
-            final UriInfo uriInfo) {
-        final QName schemaPath = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
+    public void invokeRpc(final String identifier, final NormalizedNodeContext payload, final UriInfo uriInfo,
+            final AsyncResponse ar) {
+        final SchemaNode schema = payload.getInstanceIdentifierContext().getSchemaNode();
+        final QName rpcName = schema.getQName();
         final DOMMountPoint mountPoint = payload.getInstanceIdentifierContext().getMountPoint();
-        final XMLNamespace namespace = payload.getInstanceIdentifierContext().getSchemaNode().getQName().getNamespace();
 
-        final DOMRpcResult response;
+        final ListenableFuture<? extends DOMRpcResult> future;
         final EffectiveModelContext schemaContextRef;
         if (mountPoint == null) {
             schemaContextRef = schemaContextHandler.get();
+
             // FIXME: this really should be a normal RPC invocation service which has its own interface with JAX-RS
-            if (SAL_REMOTE_NAMESPACE.equals(namespace)) {
+            if (SAL_REMOTE_NAMESPACE.equals(rpcName.getNamespace())) {
                 if (identifier.contains("create-data-change-event-subscription")) {
-                    response = CreateStreamUtil.createDataChangeNotifiStream(payload, schemaContextRef);
+                    future = Futures.immediateFuture(
+                        CreateStreamUtil.createDataChangeNotifiStream(payload, schemaContextRef));
                 } else {
-                    throw new RestconfDocumentedException("Not supported operation", ErrorType.RPC,
-                            ErrorTag.OPERATION_NOT_SUPPORTED);
+                    future = Futures.immediateFailedFuture(new RestconfDocumentedException("Unsupported operation",
+                        ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED));
                 }
             } else {
-                response = invokeRpc(payload.getData(), schemaPath, rpcService);
+                future = invokeRpc(payload.getData(), rpcName, rpcService);
             }
         } else {
-            response = invokeRpc(payload.getData(), schemaPath, mountPoint);
             schemaContextRef = modelContext(mountPoint);
+            future = invokeRpc(payload.getData(), rpcName, mountPoint);
         }
 
-        final DOMRpcResult result = checkResponse(response);
+        Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
+            @Override
+            public void onSuccess(final DOMRpcResult response) {
+                final var errors = response.getErrors();
+                if (!errors.isEmpty()) {
+                    LOG.debug("RpcError message {}", response.getErrors());
+                    ar.resume(new RestconfDocumentedException("RPCerror message ", null, response.getErrors()));
+                    return;
+                }
 
-        RpcDefinition resultNodeSchema = null;
-        NormalizedNode resultData = null;
-        if (result != null && result.getResult() != null) {
-            resultData = result.getResult();
-            resultNodeSchema = (RpcDefinition) payload.getInstanceIdentifierContext().getSchemaNode();
-        }
+                final NormalizedNode resultData = response.getResult();
+                if (resultData == null || ((ContainerNode) resultData).isEmpty()) {
+                    ar.resume(new WebApplicationException(Status.NO_CONTENT));
+                } else {
+                    ar.resume(new NormalizedNodeContext(new InstanceIdentifierContext<>(null, (RpcDefinition) schema,
+                        mountPoint, schemaContextRef), resultData));
+                }
+            }
 
-        if (resultData != null && ((ContainerNode) resultData).isEmpty()) {
-            throw new WebApplicationException(Response.Status.NO_CONTENT);
-        } else {
-            return new NormalizedNodeContext(new InstanceIdentifierContext<>(null, resultNodeSchema, mountPoint,
-                    schemaContextRef), resultData);
-        }
+            @Override
+            public void onFailure(final Throwable failure) {
+                ar.resume(failure);
+            }
+        }, MoreExecutors.directExecutor());
     }
 
     /**
@@ -122,9 +135,9 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
      * @param rpc RPC type
      * @return {@link DOMRpcResult}
      */
-    // FIXME: NETCONF-718: we should be returning a future here
     @VisibleForTesting
-    static DOMRpcResult invokeRpc(final NormalizedNode data, final QName rpc, final DOMMountPoint mountPoint) {
+    static ListenableFuture<? extends DOMRpcResult> invokeRpc(final NormalizedNode data, final QName rpc,
+            final DOMMountPoint mountPoint) {
         return invokeRpc(data, rpc, mountPoint.getService(DOMRpcService.class).orElseThrow(() -> {
             final String errmsg = "RPC service is missing.";
             LOG.debug(errmsg);
@@ -140,14 +153,14 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
      * @param rpcService rpc service to invoke rpc
      * @return {@link DOMRpcResult}
      */
-    // FIXME: NETCONF-718: we should be returning a future here
     @VisibleForTesting
-    static DOMRpcResult invokeRpc(final NormalizedNode data, final QName rpc, final DOMRpcService rpcService) {
-        return checkedGet(Futures.catching(
-            rpcService.invokeRpc(rpc, nonnullInput(rpc, data)), DOMRpcException.class,
+    static ListenableFuture<? extends DOMRpcResult> invokeRpc(final NormalizedNode data, final QName rpc,
+            final DOMRpcService rpcService) {
+        return Futures.catching(rpcService.invokeRpc(rpc, nonnullInput(rpc, data)),
+            DOMRpcException.class,
             cause -> new DefaultDOMRpcResult(ImmutableList.of(RpcResultBuilder.newError(
                 RpcError.ErrorType.RPC, "operation-failed", cause.getMessage()))),
-            MoreExecutors.directExecutor()));
+            MoreExecutors.directExecutor());
     }
 
     private static @NonNull NormalizedNode nonnullInput(final QName type, final NormalizedNode input) {
@@ -155,30 +168,6 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
                 : ImmutableNodes.containerNode(YangConstants.operationInputQName(type.getModule()));
     }
 
-    /**
-     * Check the validity of the result.
-     *
-     * @param response response of rpc
-     * @return {@link DOMRpcResult} result
-     */
-    @VisibleForTesting
-    static DOMRpcResult checkResponse(final DOMRpcResult response) {
-        if (response == null) {
-            return null;
-        }
-        try {
-            if (response.getErrors().isEmpty()) {
-                return response;
-            }
-            LOG.debug("RpcError message {}", response.getErrors());
-            throw new RestconfDocumentedException("RPCerror message ", null, response.getErrors());
-        } catch (final CancellationException e) {
-            final String errMsg = "The operation was cancelled while executing.";
-            LOG.debug("Cancel RpcExecution: {}", errMsg, e);
-            throw new RestconfDocumentedException(errMsg, ErrorType.RPC, ErrorTag.PARTIAL_OPERATION, e);
-        }
-    }
-
     @Deprecated
     static <T> T checkedGet(final ListenableFuture<T> future) {
         try {
index 77a52811d24d223af26a649eb3a406ffe1603183..03eec66926a218bd45115affe1692344bb7ecb9d 100644 (file)
@@ -50,6 +50,7 @@ public class WebInitializer {
             .addServlet(ServletDetails.builder()
                 .addUrlPattern(RestconfConstants.BASE_URI_PATTERN + "/*")
                 .servlet(servletSupport.createHttpServletBuilder(webApp).build())
+                .asyncSupported(true)
                 .build())
             .addServlet(ServletDetails.builder()
                 .addUrlPattern(RestconfConstants.BASE_URI_PATTERN + "/notif/*")
index ccfba855ca6762959b49fcc08dd6abd22023af25..0c6583f002c1c552b2c0259ce8a7e2a1b0cea989 100644 (file)
@@ -7,14 +7,16 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFailedFluentFuture;
 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
@@ -22,12 +24,15 @@ import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediate
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriInfo;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.common.api.CommitInfo;
@@ -49,7 +54,6 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
@@ -94,55 +98,48 @@ public class RestconfInvokeOperationsServiceImplTest {
 
     @Test
     public void testInvokeRpcWithNonEmptyOutput() {
-        final String identifier = "invoke-rpc-module:rpcTest";
         final ContainerNode result = mock(ContainerNode.class);
-        final LeafNode<?> outputChild = mock(LeafNode.class);
-        doCallRealMethod().when(result).isEmpty();
-        doReturn(List.of(outputChild)).when(result).body();
+        doReturn(false).when(result).isEmpty();
 
-        final NormalizedNodeContext payload = prepNNC(result);
-        final UriInfo uriInfo = mock(UriInfo.class);
+        final AsyncResponse ar = mock(AsyncResponse.class);
+        final ArgumentCaptor<NormalizedNodeContext> response = ArgumentCaptor.forClass(NormalizedNodeContext.class);
+        invokeOperationsService.invokeRpc("invoke-rpc-module:rpcTest", prepNNC(result), mock(UriInfo.class), ar);
+        verify(ar).resume(response.capture());
 
-        final NormalizedNodeContext rpc = this.invokeOperationsService.invokeRpc(identifier, payload, uriInfo);
-        assertEquals(result, rpc.getData());
+        assertSame(result, response.getValue().getData());
     }
 
     @Test
     public void testInvokeRpcWithEmptyOutput() {
-        final String identifier = "invoke-rpc-module:rpcTest";
         final ContainerNode result = mock(ContainerNode.class);
         doReturn(true).when(result).isEmpty();
 
-        final NormalizedNodeContext payload = prepNNC(result);
-        final UriInfo uriInfo = mock(UriInfo.class);
+        final AsyncResponse ar = mock(AsyncResponse.class);
+        final ArgumentCaptor<Throwable> response = ArgumentCaptor.forClass(Throwable.class);
+        invokeOperationsService.invokeRpc("invoke-rpc-module:rpcTest", prepNNC(result), mock(UriInfo.class), ar);
+        verify(ar).resume(response.capture());
 
-        WebApplicationException exceptionToBeThrown = null;
-        try {
-            this.invokeOperationsService.invokeRpc(identifier, payload, uriInfo);
-        } catch (final WebApplicationException exception) {
-            exceptionToBeThrown = exception;
-
-        }
-        assertNotNull("WebApplicationException with status code 204 is expected.", exceptionToBeThrown);
-        assertEquals(Response.Status.NO_CONTENT.getStatusCode(), exceptionToBeThrown.getResponse().getStatus());
+        final Throwable failure = response.getValue();
+        assertThat(failure, instanceOf(WebApplicationException.class));
+        assertEquals(Status.NO_CONTENT.getStatusCode(), ((WebApplicationException) failure).getResponse().getStatus());
     }
 
     @Test
-    public void invokeRpcTest() {
+    public void invokeRpcTest() throws InterruptedException, ExecutionException {
         final DOMRpcResult mockResult = new DefaultDOMRpcResult(OUTPUT, List.of());
         doReturn(immediateFluentFuture(mockResult)).when(rpcService).invokeRpc(RPC, INPUT);
-        final DOMRpcResult rpcResult = RestconfInvokeOperationsServiceImpl.invokeRpc(INPUT, RPC, rpcService);
+        final DOMRpcResult rpcResult = RestconfInvokeOperationsServiceImpl.invokeRpc(INPUT, RPC, rpcService).get();
         assertTrue(rpcResult.getErrors().isEmpty());
         assertEquals(OUTPUT, rpcResult.getResult());
     }
 
     @Test
-    public void invokeRpcErrorsAndCheckTestTest() {
+    public void invokeRpcErrorsAndCheckTestTest() throws InterruptedException, ExecutionException {
         final QName errorRpc = QName.create(RPC, "error-rpc");
         final DOMRpcException exception = new DOMRpcImplementationNotAvailableException(
                 "No implementation of RPC " + errorRpc + " available.");
         doReturn(immediateFailedFluentFuture(exception)).when(rpcService).invokeRpc(errorRpc, INPUT);
-        final DOMRpcResult rpcResult = RestconfInvokeOperationsServiceImpl.invokeRpc(INPUT, errorRpc, rpcService);
+        final DOMRpcResult rpcResult = RestconfInvokeOperationsServiceImpl.invokeRpc(INPUT, errorRpc, rpcService).get();
         assertNull(rpcResult.getResult());
         final Collection<? extends RpcError> errorList = rpcResult.getErrors();
         assertEquals(1, errorList.size());
@@ -150,17 +147,14 @@ public class RestconfInvokeOperationsServiceImplTest {
         assertEquals("No implementation of RPC " + errorRpc + " available.", actual.getMessage());
         assertEquals("operation-failed", actual.getTag());
         assertEquals(RpcError.ErrorType.RPC, actual.getErrorType());
-
-        assertThrows(RestconfDocumentedException.class,
-            () -> RestconfInvokeOperationsServiceImpl.checkResponse(rpcResult));
     }
 
     @Test
-    public void invokeRpcViaMountPointTest() {
+    public void invokeRpcViaMountPointTest() throws InterruptedException, ExecutionException {
         doReturn(Optional.ofNullable(rpcService)).when(mountPoint).getService(DOMRpcService.class);
         final DOMRpcResult mockResult = new DefaultDOMRpcResult(OUTPUT, List.of());
         doReturn(immediateFluentFuture(mockResult)).when(rpcService).invokeRpc(RPC, INPUT);
-        final DOMRpcResult rpcResult = RestconfInvokeOperationsServiceImpl.invokeRpc(INPUT, RPC, mountPoint);
+        final DOMRpcResult rpcResult = RestconfInvokeOperationsServiceImpl.invokeRpc(INPUT, RPC, mountPoint).get();
         assertTrue(rpcResult.getErrors().isEmpty());
         assertEquals(OUTPUT, rpcResult.getResult());
     }
@@ -173,13 +167,12 @@ public class RestconfInvokeOperationsServiceImplTest {
     }
 
     @Test
-    public void checkResponseTest() {
-        final DOMRpcResult mockResult = new DefaultDOMRpcResult(OUTPUT, List.of());
-        doReturn(immediateFluentFuture(mockResult)).when(rpcService).invokeRpc(RPC, INPUT);
-        final DOMRpcResult rpcResult = RestconfInvokeOperationsServiceImpl.invokeRpc(INPUT, RPC, rpcService);
+    public void checkResponseTest() throws InterruptedException, ExecutionException {
+        doReturn(immediateFluentFuture(new DefaultDOMRpcResult(OUTPUT, List.of())))
+            .when(rpcService).invokeRpc(RPC, INPUT);
+        final DOMRpcResult rpcResult = RestconfInvokeOperationsServiceImpl.invokeRpc(INPUT, RPC, rpcService).get();
         assertTrue(rpcResult.getErrors().isEmpty());
         assertEquals(OUTPUT, rpcResult.getResult());
-        assertNotNull(RestconfInvokeOperationsServiceImpl.checkResponse(rpcResult));
     }
 
     private NormalizedNodeContext prepNNC(final NormalizedNode result) {