Implement create-notification-stream
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / RestconfInvokeOperationsServiceImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import java.io.IOException;
13 import java.io.InputStream;
14 import java.util.Optional;
15 import javax.ws.rs.Consumes;
16 import javax.ws.rs.Encoded;
17 import javax.ws.rs.POST;
18 import javax.ws.rs.Path;
19 import javax.ws.rs.PathParam;
20 import javax.ws.rs.Produces;
21 import javax.ws.rs.container.AsyncResponse;
22 import javax.ws.rs.container.Suspended;
23 import javax.ws.rs.core.Context;
24 import javax.ws.rs.core.MediaType;
25 import javax.ws.rs.core.Response;
26 import javax.ws.rs.core.UriInfo;
27 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
28 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
29 import org.opendaylight.restconf.common.errors.RestconfFuture;
30 import org.opendaylight.restconf.nb.rfc8040.MediaTypes;
31 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
32 import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
33 import org.opendaylight.restconf.nb.rfc8040.databind.JsonOperationInputBody;
34 import org.opendaylight.restconf.nb.rfc8040.databind.OperationInputBody;
35 import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody;
36 import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
37 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
38 import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
39 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStream;
43 import org.opendaylight.yangtools.yang.common.ErrorTag;
44 import org.opendaylight.yangtools.yang.common.ErrorType;
45 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * An operation resource represents a protocol operation defined with the YANG {@code rpc} statement. It is invoked
51  * using a POST method on the operation resource.
52  */
53 @Path("/")
54 public final class RestconfInvokeOperationsServiceImpl {
55     private static final Logger LOG = LoggerFactory.getLogger(RestconfInvokeOperationsServiceImpl.class);
56
57     private final DatabindProvider databindProvider;
58     private final MdsalRestconfServer server;
59     @Deprecated(forRemoval = true)
60     private final DOMMountPointService mountPointService;
61     private final SubscribeToStreamUtil streamUtils;
62     private final ListenersBroker listenersBroker;
63
64     public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
65             final MdsalRestconfServer server, final DOMMountPointService mountPointService,
66             final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
67         this.databindProvider = requireNonNull(databindProvider);
68         this.server = requireNonNull(server);
69         this.mountPointService = requireNonNull(mountPointService);
70         this.listenersBroker = requireNonNull(listenersBroker);
71         streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
72             : SubscribeToStreamUtil.webSockets(listenersBroker);
73     }
74
75     /**
76      * Invoke RPC operation.
77      *
78      * @param identifier module name and rpc identifier string for the desired operation
79      * @param body the body of the operation
80      * @param uriInfo URI info
81      * @param ar {@link AsyncResponse} which needs to be completed with a {@link NormalizedNodePayload} output
82      */
83     @POST
84     // FIXME: identifier is just a *single* QName
85     @Path("/operations/{identifier:.+}")
86     @Consumes({
87         MediaTypes.APPLICATION_YANG_DATA_XML,
88         MediaType.APPLICATION_XML,
89         MediaType.TEXT_XML
90     })
91     @Produces({
92         MediaTypes.APPLICATION_YANG_DATA_JSON,
93         MediaTypes.APPLICATION_YANG_DATA_XML,
94         MediaType.APPLICATION_JSON,
95         MediaType.APPLICATION_XML,
96         MediaType.TEXT_XML
97     })
98     public void invokeRpcXML(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
99             @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) {
100         try (var xmlBody = new XmlOperationInputBody(body)) {
101             invokeRpc(identifier, uriInfo, ar, xmlBody);
102         }
103     }
104
105     /**
106      * Invoke RPC operation.
107      *
108      * @param identifier module name and rpc identifier string for the desired operation
109      * @param body the body of the operation
110      * @param uriInfo URI info
111      * @param ar {@link AsyncResponse} which needs to be completed with a {@link NormalizedNodePayload} output
112      */
113     @POST
114     // FIXME: identifier is just a *single* QName
115     @Path("/operations/{identifier:.+}")
116     @Consumes({
117         MediaTypes.APPLICATION_YANG_DATA_JSON,
118         MediaType.APPLICATION_JSON,
119     })
120     @Produces({
121         MediaTypes.APPLICATION_YANG_DATA_JSON,
122         MediaTypes.APPLICATION_YANG_DATA_XML,
123         MediaType.APPLICATION_JSON,
124         MediaType.APPLICATION_XML,
125         MediaType.TEXT_XML
126     })
127     public void invokeRpcJSON(@Encoded @PathParam("identifier") final String identifier, final InputStream body,
128             @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) {
129         try (var jsonBody = new JsonOperationInputBody(body)) {
130             invokeRpc(identifier, uriInfo, ar, jsonBody);
131         }
132     }
133
134     private void invokeRpc(final String identifier, final UriInfo uriInfo, final AsyncResponse ar,
135             final OperationInputBody body) {
136         final var databind = databindProvider.currentContext();
137         final var reqPath = server.bindRequestPath(databind, identifier);
138
139         final ContainerNode input;
140         try {
141             input = body.toContainerNode(reqPath.inference());
142         } catch (IOException e) {
143             LOG.debug("Error reading input", e);
144             throw new RestconfDocumentedException("Error parsing input: " + e.getMessage(), ErrorType.PROTOCOL,
145                     ErrorTag.MALFORMED_MESSAGE, e);
146         }
147
148         hackInvokeRpc(databind, reqPath, uriInfo, input).addCallback(new JaxRsRestconfCallback<>(ar) {
149             @Override
150             Response transform(final Optional<ContainerNode> result) {
151                 return result
152                     .filter(output -> !output.isEmpty())
153                     .map(output -> Response.ok().entity(new NormalizedNodePayload(reqPath.inference(), output)).build())
154                     .orElseGet(() -> Response.noContent().build());
155             }
156         });
157
158     }
159
160     private RestconfFuture<Optional<ContainerNode>> hackInvokeRpc(final DatabindContext localDatabind,
161             final InstanceIdentifierContext reqPath, final UriInfo uriInfo, final ContainerNode input) {
162         // RPC type
163         final var type = reqPath.getSchemaNode().getQName();
164         final var mountPoint = reqPath.getMountPoint();
165         if (mountPoint == null) {
166             // Hacked-up integration of streams
167             if (CreateDataChangeEventSubscription.QNAME.equals(type)) {
168                 return RestconfFuture.of(Optional.of(CreateStreamUtil.createDataChangeNotifiStream(
169                     streamUtils.listenersBroker(), input, localDatabind.modelContext())));
170             } else if (CreateNotificationStream.QNAME.equals(type)) {
171                 return RestconfFuture.of(Optional.of(CreateStreamUtil.createNotificationStream(
172                     streamUtils.listenersBroker(), input, localDatabind.modelContext())));
173             } else if (SubscribeDeviceNotification.QNAME.equals(type)) {
174                 final var baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
175                 return RestconfFuture.of(Optional.of(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
176                     streamUtils, mountPointService, listenersBroker)));
177             }
178         }
179
180         return server.getRestconfStrategy(reqPath.getSchemaContext(), mountPoint).invokeRpc(type, input);
181     }
182 }