Refactor NormalizedNodePayload
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / RestconfInvokeOperationsServiceImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Throwables;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.io.IOException;
19 import java.io.InputStream;
20 import java.util.List;
21 import java.util.concurrent.ExecutionException;
22 import javax.ws.rs.Consumes;
23 import javax.ws.rs.Encoded;
24 import javax.ws.rs.POST;
25 import javax.ws.rs.Path;
26 import javax.ws.rs.PathParam;
27 import javax.ws.rs.Produces;
28 import javax.ws.rs.WebApplicationException;
29 import javax.ws.rs.container.AsyncResponse;
30 import javax.ws.rs.container.Suspended;
31 import javax.ws.rs.core.Context;
32 import javax.ws.rs.core.MediaType;
33 import javax.ws.rs.core.Response.Status;
34 import javax.ws.rs.core.UriInfo;
35 import org.eclipse.jdt.annotation.NonNull;
36 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
37 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
38 import org.opendaylight.mdsal.dom.api.DOMRpcException;
39 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
40 import org.opendaylight.mdsal.dom.api.DOMRpcService;
41 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
42 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
43 import org.opendaylight.restconf.nb.rfc8040.MediaTypes;
44 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
45 import org.opendaylight.restconf.nb.rfc8040.databind.JsonOperationInputBody;
46 import org.opendaylight.restconf.nb.rfc8040.databind.OperationInputBody;
47 import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody;
48 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
49 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
50 import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
53 import org.opendaylight.yangtools.yang.common.ErrorTag;
54 import org.opendaylight.yangtools.yang.common.ErrorType;
55 import org.opendaylight.yangtools.yang.common.QName;
56 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
57 import org.opendaylight.yangtools.yang.common.YangConstants;
58 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
59 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 /**
64  * An operation resource represents a protocol operation defined with the YANG {@code rpc} statement. It is invoked
65  * using a POST method on the operation resource.
66  */
67 @Path("/")
68 public final class RestconfInvokeOperationsServiceImpl {
69     private static final Logger LOG = LoggerFactory.getLogger(RestconfInvokeOperationsServiceImpl.class);
70
71     private final DatabindProvider databindProvider;
72     private final DOMRpcService rpcService;
73     private final DOMMountPointService mountPointService;
74     private final SubscribeToStreamUtil streamUtils;
75
76     public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider, final DOMRpcService rpcService,
77             final DOMMountPointService mountPointService, final StreamsConfiguration configuration) {
78         this.databindProvider = requireNonNull(databindProvider);
79         this.rpcService = requireNonNull(rpcService);
80         this.mountPointService = requireNonNull(mountPointService);
81         streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
82             : SubscribeToStreamUtil.webSockets();
83     }
84
85     /**
86      * Invoke RPC operation.
87      *
88      * @param identifier module name and rpc identifier string for the desired operation
89      * @param body the body of the operation
90      * @param uriInfo URI info
91      * @param ar {@link AsyncResponse} which needs to be completed with a {@link NormalizedNodePayload} output
92      */
93     @POST
94     // FIXME: identifier is just a *single* QName
95     @Path("/operations/{identifier:.+}")
96     @Consumes({
97         MediaTypes.APPLICATION_YANG_DATA_XML,
98         MediaType.APPLICATION_XML,
99         MediaType.TEXT_XML
100     })
101     @Produces({
102         MediaTypes.APPLICATION_YANG_DATA_JSON,
103         MediaTypes.APPLICATION_YANG_DATA_XML,
104         MediaType.APPLICATION_JSON,
105         MediaType.APPLICATION_XML,
106         MediaType.TEXT_XML
107     })
108     public void invokeRpcXML(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
109             @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) {
110         try (var xmlBody = new XmlOperationInputBody(body)) {
111             invokeRpc(identifier, uriInfo, ar, xmlBody);
112         }
113     }
114
115     /**
116      * Invoke RPC operation.
117      *
118      * @param identifier module name and rpc identifier string for the desired operation
119      * @param body the body of the operation
120      * @param uriInfo URI info
121      * @param ar {@link AsyncResponse} which needs to be completed with a {@link NormalizedNodePayload} output
122      */
123     @POST
124     // FIXME: identifier is just a *single* QName
125     @Path("/operations/{identifier:.+}")
126     @Consumes({
127         MediaTypes.APPLICATION_YANG_DATA_JSON,
128         MediaType.APPLICATION_JSON,
129     })
130     @Produces({
131         MediaTypes.APPLICATION_YANG_DATA_JSON,
132         MediaTypes.APPLICATION_YANG_DATA_XML,
133         MediaType.APPLICATION_JSON,
134         MediaType.APPLICATION_XML,
135         MediaType.TEXT_XML
136     })
137     public void invokeRpcJSON(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
138             @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) {
139         try (var jsonBody = new JsonOperationInputBody(body)) {
140             invokeRpc(identifier, uriInfo, ar, jsonBody);
141         }
142     }
143
144     private void invokeRpc(final String identifier, final UriInfo uriInfo, final AsyncResponse ar,
145             final OperationInputBody body) {
146         final var dataBind = databindProvider.currentContext();
147         final var schemaContext = dataBind.modelContext();
148         final var context = ParserIdentifier.toInstanceIdentifier(identifier, schemaContext, mountPointService);
149
150         final ContainerNode input;
151         try {
152             input = body.toContainerNode(context.inference());
153         } catch (IOException e) {
154             LOG.debug("Error reading input", e);
155             throw new RestconfDocumentedException("Error parsing input: " + e.getMessage(), ErrorType.PROTOCOL,
156                     ErrorTag.MALFORMED_MESSAGE, e);
157         }
158         final var rpcName = context.getSchemaNode().getQName();
159
160         final ListenableFuture<? extends DOMRpcResult> future;
161         final var mountPoint = context.getMountPoint();
162         if (mountPoint == null) {
163             if (CreateDataChangeEventSubscription.QNAME.equals(rpcName)) {
164                 future = Futures.immediateFuture(CreateStreamUtil.createDataChangeNotifiStream(
165                     streamUtils.listenersBroker(), input, schemaContext));
166             } else if (SubscribeDeviceNotification.QNAME.equals(rpcName)) {
167                 final String baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
168                 future = Futures.immediateFuture(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
169                     streamUtils, mountPointService));
170             } else {
171                 future = invokeRpc(input, rpcName, rpcService);
172             }
173         } else {
174             future = invokeRpc(input, rpcName, mountPoint);
175         }
176
177         Futures.addCallback(future, new FutureCallback<DOMRpcResult>() {
178             @Override
179             public void onSuccess(final DOMRpcResult response) {
180                 final var errors = response.errors();
181                 if (!errors.isEmpty()) {
182                     LOG.debug("RpcError message {}", response.errors());
183                     ar.resume(new RestconfDocumentedException("RPCerror message ", null, response.errors()));
184                     return;
185                 }
186
187                 final ContainerNode resultData = response.value();
188                 if (resultData == null || resultData.isEmpty()) {
189                     ar.resume(new WebApplicationException(Status.NO_CONTENT));
190                 } else {
191                     ar.resume(new NormalizedNodePayload(context.inference(), resultData));
192                 }
193             }
194
195             @Override
196             public void onFailure(final Throwable failure) {
197                 ar.resume(failure);
198             }
199         }, MoreExecutors.directExecutor());
200     }
201
202     /**
203      * Invoking rpc via mount point.
204      *
205      * @param mountPoint mount point
206      * @param data input data
207      * @param rpc RPC type
208      * @return {@link DOMRpcResult}
209      */
210     @VisibleForTesting
211     static ListenableFuture<? extends DOMRpcResult> invokeRpc(final ContainerNode data, final QName rpc,
212             final DOMMountPoint mountPoint) {
213         return invokeRpc(data, rpc, mountPoint.getService(DOMRpcService.class).orElseThrow(() -> {
214             final String errmsg = "RPC service is missing.";
215             LOG.debug(errmsg);
216             return new RestconfDocumentedException(errmsg);
217         }));
218     }
219
220     /**
221      * Invoke rpc.
222      *
223      * @param input input data
224      * @param rpc RPC type
225      * @param rpcService rpc service to invoke rpc
226      * @return {@link DOMRpcResult}
227      */
228     @VisibleForTesting
229     static ListenableFuture<? extends DOMRpcResult> invokeRpc(final ContainerNode input, final QName rpc,
230             final DOMRpcService rpcService) {
231         return Futures.catching(rpcService.invokeRpc(rpc, nonnullInput(rpc, input)), DOMRpcException.class,
232             cause -> new DefaultDOMRpcResult(List.of(RpcResultBuilder.newError(ErrorType.RPC, ErrorTag.OPERATION_FAILED,
233                 cause.getMessage()))),
234             MoreExecutors.directExecutor());
235     }
236
237     private static @NonNull ContainerNode nonnullInput(final QName type, final ContainerNode input) {
238         return input != null ? input
239                 : ImmutableNodes.containerNode(YangConstants.operationInputQName(type.getModule()));
240     }
241
242     @Deprecated
243     static <T> T checkedGet(final ListenableFuture<T> future) {
244         try {
245             return future.get();
246         } catch (InterruptedException e) {
247             throw new RestconfDocumentedException("Interrupted while waiting for result of invocation", e);
248         } catch (ExecutionException e) {
249             Throwables.throwIfInstanceOf(e.getCause(), RestconfDocumentedException.class);
250             throw new RestconfDocumentedException("Invocation failed", e);
251         }
252     }
253 }