Use RestconfFuture/AsyncResponse for postData() 76/108276/6
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 9 Oct 2023 08:19:26 +0000 (10:19 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 10 Oct 2023 07:53:12 +0000 (09:53 +0200)
We now have the tooling we need to make POST requests asynchronous, at
least from the JAX-RS perspective.

This patch provides the first-order wiring change, allowing
RestconfStrategy to completely control how the request is dispatched.
Current implementation eliminates synchronous wait for the commit
future, but leaves other blocking calls as they are.

JIRA: NETCONF-718
Change-Id: Iefcbe31b73b9ed85527ce57d75f29145dbc2ab4c
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java
restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/RestconfStrategy.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/Netconf799Test.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImplTest.java
restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/AbstractRestconfStrategyTest.java

index b17f2c6a2526d8b96735ef2d9e349d17abe1cc62..b5e968147ebdacec6ff0f3f55381066a0b2274a9 100644 (file)
@@ -16,7 +16,7 @@ import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsCo
 import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAM_PATH_PART;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.IOException;
@@ -59,6 +59,8 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteOperations;
 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
 import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.common.errors.RestconfFuture;
+import org.opendaylight.restconf.common.errors.SettableRestconfFuture;
 import org.opendaylight.restconf.common.patch.PatchContext;
 import org.opendaylight.restconf.common.patch.PatchStatusContext;
 import org.opendaylight.restconf.nb.rfc8040.MediaTypes;
@@ -93,6 +95,7 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.Revision;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -389,7 +392,7 @@ public final class RestconfDataServiceImpl {
      *
      * @param body data node for put to config DS
      * @param uriInfo URI info
-     * @return {@link Response}
+     * @param ar {@link AsyncResponse} which needs to be completed
      */
     @POST
     @Path("/data")
@@ -397,9 +400,10 @@ public final class RestconfDataServiceImpl {
         MediaTypes.APPLICATION_YANG_DATA_JSON,
         MediaType.APPLICATION_JSON,
     })
-    public Response postDataJSON(final InputStream body, @Context final UriInfo uriInfo) {
+    public void postDataJSON(final InputStream body, @Context final UriInfo uriInfo,
+            @Suspended final AsyncResponse ar) {
         try (var jsonBody = new JsonChildBody(body)) {
-            return postData(jsonBody, uriInfo);
+            postData(jsonBody, uriInfo, ar);
         }
     }
 
@@ -409,7 +413,7 @@ public final class RestconfDataServiceImpl {
      * @param identifier path to target
      * @param body data node for put to config DS
      * @param uriInfo URI info
-     * @return {@link Response}
+     * @param ar {@link AsyncResponse} which needs to be completed
      */
     @POST
     @Path("/data/{identifier:.+}")
@@ -417,18 +421,18 @@ public final class RestconfDataServiceImpl {
         MediaTypes.APPLICATION_YANG_DATA_JSON,
         MediaType.APPLICATION_JSON,
     })
-    public Response postDataJSON(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
-            @Context final UriInfo uriInfo) {
+    public void postDataJSON(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
+            @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) {
         final var reqPath = server.bindRequestPath(databindProvider.currentContext(), identifier);
         if (reqPath.getSchemaNode() instanceof ActionDefinition) {
             try (var jsonBody = new JsonOperationInputBody(body)) {
-                return invokeAction(reqPath, jsonBody);
+                invokeAction(reqPath, jsonBody, ar);
+            }
+        } else {
+            try (var jsonBody = new JsonChildBody(body)) {
+                postData(reqPath.inference(), reqPath.getInstanceIdentifier(), jsonBody, uriInfo,
+                    reqPath.getMountPoint(), ar);
             }
-        }
-
-        try (var jsonBody = new JsonChildBody(body)) {
-            return postData(reqPath.inference(), reqPath.getInstanceIdentifier(), jsonBody, uriInfo,
-                reqPath.getMountPoint());
         }
     }
 
@@ -437,7 +441,7 @@ public final class RestconfDataServiceImpl {
      *
      * @param body data node for put to config DS
      * @param uriInfo URI info
-     * @return {@link Response}
+     * @param ar {@link AsyncResponse} which needs to be completed
      */
     @POST
     @Path("/data")
@@ -446,9 +450,9 @@ public final class RestconfDataServiceImpl {
         MediaType.APPLICATION_XML,
         MediaType.TEXT_XML
     })
-    public Response postDataXML(final InputStream body, @Context final UriInfo uriInfo) {
+    public void postDataXML(final InputStream body, @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) {
         try (var xmlBody = new XmlChildBody(body)) {
-            return postData(xmlBody, uriInfo);
+            postData(xmlBody, uriInfo, ar);
         }
     }
 
@@ -458,7 +462,7 @@ public final class RestconfDataServiceImpl {
      * @param identifier path to target
      * @param body data node for put to config DS
      * @param uriInfo URI info
-     * @return {@link Response}
+     * @param ar {@link AsyncResponse} which needs to be completed
      */
     @POST
     @Path("/data/{identifier:.+}")
@@ -467,41 +471,45 @@ public final class RestconfDataServiceImpl {
         MediaType.APPLICATION_XML,
         MediaType.TEXT_XML
     })
-    public Response postDataXML(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
-            @Context final UriInfo uriInfo) {
+    public void postDataXML(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
+            @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) {
         final var reqPath = server.bindRequestPath(databindProvider.currentContext(), identifier);
         if (reqPath.getSchemaNode() instanceof ActionDefinition) {
             try (var xmlBody = new XmlOperationInputBody(body)) {
-                return invokeAction(reqPath, xmlBody);
+                invokeAction(reqPath, xmlBody, ar);
+            }
+        } else {
+            try (var xmlBody = new XmlChildBody(body)) {
+                postData(reqPath.inference(), reqPath.getInstanceIdentifier(), xmlBody, uriInfo,
+                    reqPath.getMountPoint(), ar);
             }
-        }
-
-        try (var xmlBody = new XmlChildBody(body)) {
-            return postData(reqPath.inference(), reqPath.getInstanceIdentifier(), xmlBody, uriInfo,
-                reqPath.getMountPoint());
         }
     }
 
-    private Response postData(final ChildBody body, final UriInfo uriInfo) {
-        return postData(Inference.ofDataTreePath(databindProvider.currentContext().modelContext()),
-            YangInstanceIdentifier.of(), body, uriInfo, null);
+    private void postData(final ChildBody body, final UriInfo uriInfo, final AsyncResponse ar) {
+        postData(Inference.ofDataTreePath(databindProvider.currentContext().modelContext()),
+            YangInstanceIdentifier.of(), body, uriInfo, null, ar);
     }
 
-    private Response postData(final Inference inference, final YangInstanceIdentifier parentPath, final ChildBody body,
-            final UriInfo uriInfo, final @Nullable DOMMountPoint mountPoint) {
+    private void postData(final Inference inference, final YangInstanceIdentifier parentPath, final ChildBody body,
+            final UriInfo uriInfo, final @Nullable DOMMountPoint mountPoint, final AsyncResponse ar) {
         final var modelContext = inference.getEffectiveModelContext();
         final var insert = QueryParams.parseInsert(modelContext, uriInfo);
         final var strategy = server.getRestconfStrategy(modelContext, mountPoint);
-        var path = parentPath;
-        final var payload = body.toPayload(path, inference);
+        final var payload = body.toPayload(parentPath, inference);
         final var data = payload.body();
+        final var path = concat(parentPath, payload.prefix());
 
-        for (var arg : payload.prefix()) {
-            path = path.node(arg);
-        }
+        strategy.postData(path, data, insert).addCallback(new JaxRsRestconfCallback<>(ar,
+            ignored -> Response.created(resolveLocation(uriInfo, path, modelContext, data)).build()));
+    }
 
-        strategy.postData(path, data, insert);
-        return Response.created(resolveLocation(uriInfo, path, modelContext, data)).build();
+    private static YangInstanceIdentifier concat(final YangInstanceIdentifier parent, final List<PathArgument> args) {
+        var ret = parent;
+        for (var arg : args) {
+            ret = ret.node(arg);
+        }
+        return ret;
     }
 
     /**
@@ -793,9 +801,10 @@ public final class RestconfDataServiceImpl {
      * Invoke Action operation.
      *
      * @param payload {@link NormalizedNodePayload} - the body of the operation
-     * @return {@link NormalizedNodePayload} wrapped in {@link Response}
+     * @param ar {@link AsyncResponse} which needs to be completed with a NormalizedNodePayload
      */
-    private Response invokeAction(final InstanceIdentifierContext reqPath, final OperationInputBody body) {
+    private void invokeAction(final InstanceIdentifierContext reqPath, final OperationInputBody body,
+            final AsyncResponse ar) {
         final var yangIIdContext = reqPath.getInstanceIdentifier();
         final ContainerNode input;
         try {
@@ -811,13 +820,12 @@ public final class RestconfDataServiceImpl {
         final var schemaPath = inference.toSchemaInferenceStack().toSchemaNodeIdentifier();
         final var response = mountPoint != null ? invokeAction(input, schemaPath, yangIIdContext, mountPoint)
             : invokeAction(input, schemaPath, yangIIdContext, actionService);
-        final var result = checkActionResponse(response);
 
-        final var resultData = result != null ? result.getOutput().orElse(null) : null;
-        if (resultData == null || resultData.isEmpty()) {
-            return Response.status(Status.NO_CONTENT).build();
-        }
-        return Response.status(Status.OK).entity(new NormalizedNodePayload(inference, resultData)).build();
+        response.addCallback(new JaxRsRestconfCallback<>(ar, result -> {
+            final var output = result.getOutput().orElse(null);
+            return output == null || output.isEmpty() ? Response.status(Status.NO_CONTENT).build()
+                : Response.status(Status.OK).entity(new NormalizedNodePayload(inference, output)).build();
+        }));
     }
 
     /**
@@ -828,10 +836,11 @@ public final class RestconfDataServiceImpl {
      * @param schemaPath schema path of data
      * @return {@link DOMActionResult}
      */
-    private static DOMActionResult invokeAction(final ContainerNode data,
+    private static RestconfFuture<DOMActionResult> invokeAction(final ContainerNode data,
             final Absolute schemaPath, final YangInstanceIdentifier yangIId, final DOMMountPoint mountPoint) {
-        return invokeAction(data, schemaPath, yangIId, mountPoint.getService(DOMActionService.class)
-            .orElseThrow(() -> new RestconfDocumentedException("DomAction service is missing.")));
+        final var actionService = mountPoint.getService(DOMActionService.class);
+        return actionService.isPresent() ? invokeAction(data, schemaPath, yangIId, actionService.orElseThrow())
+            : RestconfFuture.failed(new RestconfDocumentedException("DOMActionService is missing."));
     }
 
     /**
@@ -843,48 +852,40 @@ public final class RestconfDataServiceImpl {
      * @param actionService action service to invoke action
      * @return {@link DOMActionResult}
      */
-    // FIXME: NETCONF-718: we should be returning a future here
-    private static DOMActionResult invokeAction(final ContainerNode data, final Absolute schemaPath,
+    private static RestconfFuture<DOMActionResult> invokeAction(final ContainerNode data, final Absolute schemaPath,
             final YangInstanceIdentifier yangIId, final DOMActionService actionService) {
-        final var future = Futures.catching(
-            actionService.invokeAction(schemaPath,
-                new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, yangIId.getParent()), data),
-            DOMActionException.class,
-            cause -> new SimpleDOMActionResult(List.of(RpcResultBuilder.newError(
-                ErrorType.RPC, ErrorTag.OPERATION_FAILED, cause.getMessage()))),
-            MoreExecutors.directExecutor());
-
-        try {
-            return future.get();
-        } catch (InterruptedException e) {
-            throw new RestconfDocumentedException("Interrupted while waiting for result of invocation", e);
-        } catch (ExecutionException e) {
-            Throwables.throwIfInstanceOf(e.getCause(), RestconfDocumentedException.class);
-            throw new RestconfDocumentedException("Invocation failed", e);
-        }
-    }
-
-    /**
-     * Check the validity of the result.
-     *
-     * @param response response of Action
-     * @return {@link DOMActionResult} result
-     */
-    private static DOMActionResult checkActionResponse(final DOMActionResult response) {
-        if (response == null) {
-            return null;
-        }
-
-        try {
-            if (response.getErrors().isEmpty()) {
-                return response;
-            }
-            LOG.debug("InvokeAction Error Message {}", response.getErrors());
-            throw new RestconfDocumentedException("InvokeAction Error Message ", null, response.getErrors());
-        } catch (final CancellationException e) {
-            final String errMsg = "The Action Operation was cancelled while executing.";
-            LOG.debug("Cancel Execution: {}", errMsg, e);
-            throw new RestconfDocumentedException(errMsg, ErrorType.RPC, ErrorTag.PARTIAL_OPERATION, e);
-        }
+        final var ret = new SettableRestconfFuture<DOMActionResult>();
+
+        Futures.addCallback(actionService.invokeAction(schemaPath,
+            new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL, yangIId.getParent()), data),
+            new FutureCallback<DOMActionResult>() {
+                @Override
+                public void onSuccess(final DOMActionResult result) {
+                    final var errors = result.getErrors();
+                    LOG.debug("InvokeAction Error Message {}", errors);
+                    if (errors.isEmpty()) {
+                        ret.set(result);
+                    } else {
+                        ret.setFailure(new RestconfDocumentedException("InvokeAction Error Message ", null, errors));
+                    }
+                }
+
+                @Override
+                public void onFailure(final Throwable cause) {
+                    if (cause instanceof DOMActionException) {
+                        ret.set(new SimpleDOMActionResult(List.of(RpcResultBuilder.newError(
+                            ErrorType.RPC, ErrorTag.OPERATION_FAILED, cause.getMessage()))));
+                    } else if (cause instanceof RestconfDocumentedException e) {
+                        ret.setFailure(e);
+                    } else if (cause instanceof CancellationException) {
+                        ret.setFailure(new RestconfDocumentedException("Action cancelled while executing",
+                            ErrorType.RPC, ErrorTag.PARTIAL_OPERATION, cause));
+                    } else {
+                        ret.setFailure(new RestconfDocumentedException("Invocation failed", cause));
+                    }
+                }
+            }, MoreExecutors.directExecutor());
+
+        return ret;
     }
 }
index e2efbd6a6ef4563ba3b5e1dc3789fc135aa2fdf6..958a06843c66c6f39962f22069c0fdc319ee4ccc 100644 (file)
@@ -356,8 +356,9 @@ public abstract class RestconfStrategy {
      * @param path    path
      * @param data    data
      * @param insert  {@link Insert}
+     * @return A {@link RestconfFuture}
      */
-    public final void postData(final YangInstanceIdentifier path, final NormalizedNode data,
+    public final RestconfFuture<Empty> postData(final YangInstanceIdentifier path, final NormalizedNode data,
             final @Nullable Insert insert) {
         final ListenableFuture<? extends CommitInfo> future;
         if (insert != null) {
@@ -367,7 +368,21 @@ public abstract class RestconfStrategy {
         } else {
             future = createAndCommit(prepareWriteExecution(), path, data);
         }
-        TransactionUtil.syncCommit(future, "POST", path);
+        final var ret = new SettableRestconfFuture<Empty>();
+
+        Futures.addCallback(future, new FutureCallback<CommitInfo>() {
+            @Override
+            public void onSuccess(final CommitInfo result) {
+                ret.set(Empty.value());
+            }
+
+            @Override
+            public void onFailure(final Throwable cause) {
+                ret.setFailure(TransactionUtil.decodeException(cause, "POST", path));
+            }
+        }, MoreExecutors.directExecutor());
+
+        return ret;
     }
 
     private ListenableFuture<? extends CommitInfo> insertAndCommitPost(final YangInstanceIdentifier path,
index 5a39d2315f35299ea719182d5d0584eb431b8c27..ff334685cb6585ec7e6b1e9d2e37eab9ce9b50ee 100644 (file)
@@ -14,8 +14,12 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
 
 import com.google.common.util.concurrent.Futures;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.opendaylight.mdsal.dom.api.DOMActionService;
@@ -47,6 +51,10 @@ public class Netconf799Test extends AbstractInstanceIdentifierTest {
     private DOMMountPointService mountPointService;
     @Mock
     private RestconfStreamsSubscriptionService restconfStreamSubService;
+    @Mock
+    private AsyncResponse asyncResponse;
+    @Captor
+    private ArgumentCaptor<Response> captor;
 
     @Test
     public void testInvokeAction() {
@@ -58,13 +66,15 @@ public class Netconf799Test extends AbstractInstanceIdentifierTest {
             new MdsalRestconfServer(dataBroker, rpcService, mountPointService), dataBroker, restconfStreamSubService,
             actionService, new ListenersBroker(), new StreamsConfiguration(0, 1, 0, false));
 
-        final var response = dataService.postDataJSON("instance-identifier-module:cont/cont1/reset",
+        doReturn(true).when(asyncResponse).resume(captor.capture());
+        dataService.postDataJSON("instance-identifier-module:cont/cont1/reset",
             stringInputStream("""
             {
               "instance-identifier-module:input": {
                 "delay": 600
               }
-            }"""), null);
+            }"""), null, asyncResponse);
+        final var response = captor.getValue();
         assertEquals(204, response.getStatus());
         assertNull(response.getEntity());
     }
index 0967368b365ca422f4523479b928395f898ff09b..0116050ed49ce4142f1a885ec613c758788b5a85 100644 (file)
@@ -335,11 +335,14 @@ public class RestconfDataServiceImplTest extends AbstractJukeboxTest {
             Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(JUKEBOX_QNAME)).build());
         doReturn(UriBuilder.fromUri("http://localhost:8181/rests/")).when(uriInfo).getBaseUriBuilder();
 
-        final var response = dataService.postDataJSON(new ByteArrayInputStream("""
+        final var captor = ArgumentCaptor.forClass(Response.class);
+        doReturn(true).when(asyncResponse).resume(captor.capture());
+        dataService.postDataJSON(stringInputStream("""
             {
               "example-jukebox:jukebox" : {
               }
-            }""".getBytes(StandardCharsets.UTF_8)), uriInfo);
+            }"""), uriInfo, asyncResponse);
+        final var response = captor.getValue();
         assertEquals(201, response.getStatus());
         assertEquals(URI.create("http://localhost:8181/rests/data/example-jukebox:jukebox"), response.getLocation());
     }
@@ -352,14 +355,16 @@ public class RestconfDataServiceImplTest extends AbstractJukeboxTest {
         doNothing().when(readWrite).put(LogicalDatastoreType.CONFIGURATION, node, BAND_ENTRY);
         doReturn(UriBuilder.fromUri("http://localhost:8181/rests/")).when(uriInfo).getBaseUriBuilder();
 
-        final var response = dataService.postDataJSON("example-jukebox:jukebox", new ByteArrayInputStream("""
+        final var captor = ArgumentCaptor.forClass(Response.class);
+        doReturn(true).when(asyncResponse).resume(captor.capture());
+        dataService.postDataJSON("example-jukebox:jukebox", stringInputStream("""
             {
               "example-jukebox:playlist" : {
                 "name" : "name of band",
                 "description" : "band description"
               }
-            }""".getBytes(StandardCharsets.UTF_8)),
-            uriInfo);
+            }"""), uriInfo, asyncResponse);
+        final var response = captor.getValue();
         assertEquals(201, response.getStatus());
         assertEquals(URI.create("http://localhost:8181/rests/data/example-jukebox:jukebox/playlist=name%20of%20band"),
             response.getLocation());
index 50878d6efef3bfb06832a0605bb5b18ffaa74b13..1c52f527eb9abd8d3907c692ab122d86dcaf5cd5 100644 (file)
@@ -258,11 +258,12 @@ abstract class AbstractRestconfStrategyTest extends AbstractJukeboxTest {
     @Test
     public final void testPostDataFail() {
         final var domException = new DOMException((short) 414, "Post request failed");
-
-        RestconfDocumentedException ex = assertThrows(RestconfDocumentedException.class,
-            () -> testPostDataFailStrategy(domException).postData(JUKEBOX_IID, EMPTY_JUKEBOX, null));
-        assertEquals(1, ex.getErrors().size());
-        assertThat(ex.getErrors().get(0).getErrorInfo(), containsString(domException.getMessage()));
+        final var future = testPostDataFailStrategy(domException).postData(JUKEBOX_IID, EMPTY_JUKEBOX, null);
+        final var cause = assertThrows(ExecutionException.class, () -> Futures.getDone(future)).getCause();
+        assertThat(cause, instanceOf(RestconfDocumentedException.class));
+        final var errors = ((RestconfDocumentedException) cause).getErrors();
+        assertEquals(1, errors.size());
+        assertThat(errors.get(0).getErrorInfo(), containsString(domException.getMessage()));
     }
 
     abstract @NonNull RestconfStrategy testPostDataFailStrategy(DOMException domException);