Move RestconfStreamsConstants
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / CreateStreamUtil.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
9
10 import com.google.common.collect.ImmutableSet;
11 import java.util.Optional;
12 import org.eclipse.jdt.annotation.Nullable;
13 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
14 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
15 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
16 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
17 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
18 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
19 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
20 import org.opendaylight.restconf.common.errors.RestconfFuture;
21 import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker;
22 import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStreamsConstants;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
28 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
29 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
30 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
31 import org.opendaylight.yangtools.yang.common.ErrorTag;
32 import org.opendaylight.yangtools.yang.common.ErrorType;
33 import org.opendaylight.yangtools.yang.common.QName;
34 import org.opendaylight.yangtools.yang.common.QNameModule;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
38 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
39 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
40 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
43 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
44 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
45 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
46 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
47 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
48
49 /**
50  * Utility class for creation of data-change-event or YANG notification streams.
51  */
52 final class CreateStreamUtil {
53     private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
54
55     private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
56     private static final QName DATASTORE_QNAME =
57         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
58     private static final QName SCOPE_QNAME =
59         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
60     private static final QName OUTPUT_TYPE_QNAME =
61         QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
62     private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
63         QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
64     private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
65         QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
66     private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
67     private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
68     private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
69     private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
70         NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
71     private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
72         NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
73     private static final NodeIdentifier NOTIFICATIONS =
74         NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern());
75     private static final NodeIdentifier PATH_NODEID =
76         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
77     private static final NodeIdentifier STREAM_NAME_NODEID =
78         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
79
80     private CreateStreamUtil() {
81         // Hidden on purpose
82     }
83
84     /**
85      * Create data-change-event stream with POST operation via RPC.
86      *
87      * @param input Input of RPC - example in JSON (data-change-event stream):
88      *              <pre>
89      *              {@code
90      *                  {
91      *                      "input": {
92      *                          "path": "/toaster:toaster/toaster:toasterStatus",
93      *                          "sal-remote-augment:datastore": "OPERATIONAL",
94      *                          "sal-remote-augment:scope": "ONE"
95      *                      }
96      *                  }
97      *              }
98      *              </pre>
99      * @param modelContext Reference to {@link EffectiveModelContext}.
100      * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
101      *     <pre>
102      *     {@code
103      *         {
104      *             "output": {
105      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
106      *             }
107      *         }
108      *     }
109      *     </pre>
110      */
111     // FIXME: this really should be a normal RPC implementation
112     static RestconfFuture<Optional<ContainerNode>> createDataChangeNotifiStream(final ListenersBroker listenersBroker,
113             final ContainerNode input, final EffectiveModelContext modelContext) {
114         final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID);
115         final var scopeName = extractStringLeaf(input, SCOPE_NODEID);
116         final var adapter = listenersBroker.registerDataChangeListener(modelContext,
117             datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName) : LogicalDatastoreType.CONFIGURATION,
118             preparePath(input), scopeName != null ? Scope.ofName(scopeName) : Scope.BASE, prepareOutputType(input));
119
120         // building of output
121         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
122             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
123             .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
124             .build()));
125     }
126
127     // FIXME: this really should be a normal RPC implementation
128     static RestconfFuture<Optional<ContainerNode>> createNotificationStream(final ListenersBroker listenersBroker,
129             final ContainerNode input, final EffectiveModelContext modelContext) {
130         final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
131             .map(LeafSetEntryNode::body)
132             .map(QName::create)
133             .sorted()
134             .collect(ImmutableSet.toImmutableSet());
135
136         for (var qname : qnames) {
137             if (modelContext.findNotification(qname).isEmpty()) {
138                 throw new RestconfDocumentedException(qname + " refers to an unknown notification",
139                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
140             }
141         }
142
143         // registration of the listener
144         final var adapter = listenersBroker.registerNotificationListener(modelContext, qnames,
145             prepareOutputType(input));
146
147         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
148             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
149             .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
150             .build()));
151     }
152
153     /**
154      * Create device notification stream.
155      *
156      * @param baseUrl base Url
157      * @param input RPC input
158      * @param mountPointService dom mount point service
159      * @param listenersBroker {@link ListenersBroker}
160      * @return {@link DOMRpcResult} - Output of RPC - example in JSON
161      */
162     // FIXME: this should be an RPC invocation
163     static RestconfFuture<Optional<ContainerNode>> createDeviceNotificationListener(
164             final ListenersBroker listenersBroker, final ContainerNode input, final String baseUrl,
165             final DOMMountPointService mountPointService) {
166         // parsing out of container with settings and path
167         // FIXME: ugly cast
168         final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
169                 .map(DataContainerChild::body)
170                 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
171                     ErrorTag.DATA_MISSING));
172
173         if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
174             throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
175                 ErrorTag.INVALID_VALUE);
176         }
177         if (listId.size() != 1) {
178             throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
179                 ErrorTag.INVALID_VALUE);
180         }
181         final String deviceName = listId.values().iterator().next().toString();
182
183         final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
184             .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
185                 ErrorTag.OPERATION_FAILED));
186
187         final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
188             .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
189                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
190
191         final var mountModelContext = mountPoint.getService(DOMSchemaService.class)
192             .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
193                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
194             .getGlobalContext();
195         final var notificationPaths = mountModelContext.getModuleStatements().values().stream()
196             .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
197             .map(notification -> Absolute.of(notification.argument()))
198             .collect(ImmutableSet.toImmutableSet());
199         if (notificationPaths.isEmpty()) {
200             throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
201                 ErrorTag.OPERATION_FAILED);
202         }
203
204         final var notificationListenerAdapter = listenersBroker.registerDeviceNotificationListener(deviceName,
205             prepareOutputType(input), mountModelContext, mountPointService, mountPoint.getIdentifier());
206         notificationListenerAdapter.listen(mountNotifService, notificationPaths);
207
208         return RestconfFuture.of(Optional.of(Builders.containerBuilder()
209             .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
210             .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
211                 baseUrl + notificationListenerAdapter.getStreamName()))
212             .build()));
213     }
214
215     /**
216      * Prepare {@link NotificationOutputType}.
217      *
218      * @param data Container with stream settings (RPC create-stream).
219      * @return Parsed {@link NotificationOutputType}.
220      */
221     private static NotificationOutputType prepareOutputType(final ContainerNode data) {
222         final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
223         return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
224     }
225
226     /**
227      * Prepare {@link YangInstanceIdentifier} of stream source.
228      *
229      * @param data Container with stream settings (RPC create-stream).
230      * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
231      *         are going to be generated.
232      */
233     private static YangInstanceIdentifier preparePath(final ContainerNode data) {
234         final var pathLeaf = data.childByArg(PATH_NODEID);
235         if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
236             return pathValue;
237         }
238
239         throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
240             ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
241     }
242
243     private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
244         return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
245             ? str : null;
246     }
247 }