Implement create-notification-stream
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / CreateStreamUtil.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.ImmutableSet;
13 import java.util.Set;
14 import java.util.stream.Collectors;
15 import org.eclipse.jdt.annotation.Nullable;
16 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
17 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
18 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
19 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
20 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
21 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
22 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
23 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
24 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
25 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
26 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
32 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
33 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
34 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
35 import org.opendaylight.yangtools.yang.common.ErrorTag;
36 import org.opendaylight.yangtools.yang.common.ErrorType;
37 import org.opendaylight.yangtools.yang.common.QName;
38 import org.opendaylight.yangtools.yang.common.QNameModule;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
42 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
44 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
47 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
50 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
51 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
52
53 /**
54  * Utility class for creation of data-change-event or YANG notification streams.
55  */
56 final class CreateStreamUtil {
57     private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
58
59     private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
60     private static final QName DATASTORE_QNAME =
61         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
62     private static final QName SCOPE_QNAME =
63         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
64     private static final QName OUTPUT_TYPE_QNAME =
65         QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
66     private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
67         QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
68     private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
69         QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
70     private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
71     private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
72     private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
73     private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
74         NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
75     private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
76         NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
77     private static final NodeIdentifier NOTIFICATIONS =
78         NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern());
79     private static final NodeIdentifier PATH_NODEID =
80         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
81     private static final NodeIdentifier STREAM_NAME_NODEID =
82         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
83
84     private CreateStreamUtil() {
85         // Hidden on purpose
86     }
87
88     /**
89      * Create data-change-event stream with POST operation via RPC.
90      *
91      * @param input Input of RPC - example in JSON (data-change-event stream):
92      *              <pre>
93      *              {@code
94      *                  {
95      *                      "input": {
96      *                          "path": "/toaster:toaster/toaster:toasterStatus",
97      *                          "sal-remote-augment:datastore": "OPERATIONAL",
98      *                          "sal-remote-augment:scope": "ONE"
99      *                      }
100      *                  }
101      *              }
102      *              </pre>
103      * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
104      * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
105      *     <pre>
106      *     {@code
107      *         {
108      *             "output": {
109      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
110      *             }
111      *         }
112      *     }
113      *     </pre>
114      */
115     // FIXME: this really should be a normal RPC implementation
116     static ContainerNode createDataChangeNotifiStream(final ListenersBroker listenersBroker, final ContainerNode input,
117             final EffectiveModelContext refSchemaCtx) {
118         // parsing out of container with settings and path
119         final YangInstanceIdentifier path = preparePath(input);
120
121         // building of stream name
122         final StringBuilder streamNameBuilder = new StringBuilder(
123                 prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx), input));
124         final NotificationOutputType outputType = prepareOutputType(input);
125         if (outputType.equals(NotificationOutputType.JSON)) {
126             streamNameBuilder.append('/').append(outputType.getName());
127         }
128         final String streamName = streamNameBuilder.toString();
129
130         // registration of the listener
131         listenersBroker.registerDataChangeListener(path, streamName, outputType);
132
133         // building of output
134         return Builders.containerBuilder()
135             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
136             .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, streamName))
137             .build();
138     }
139
140     // FIXME: this really should be a normal RPC implementation
141     static ContainerNode createNotificationStream(final ListenersBroker listenersBroker, final ContainerNode input,
142             final EffectiveModelContext refSchemaCtx) {
143         final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
144             .map(LeafSetEntryNode::body)
145             .map(QName::create)
146             .sorted()
147             .collect(ImmutableSet.toImmutableSet());
148
149         final var streamNameBuilder = new StringBuilder(RestconfStreamsConstants.NOTIFICATION_STREAM).append('/');
150         var haveFirst = false;
151         for (var qname : qnames) {
152             final var module = refSchemaCtx.findModuleStatement(qname.getModule())
153                 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
154                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
155             final var stmt = module.findSchemaTreeNode(qname)
156                 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
157                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
158             if (!(stmt instanceof NotificationEffectiveStatement)) {
159                 throw new RestconfDocumentedException(qname + " refers to a non-notification",
160                     ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
161             }
162
163             if (haveFirst) {
164                 streamNameBuilder.append(',');
165             } else {
166                 haveFirst = true;
167             }
168             streamNameBuilder.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
169         }
170         final var outputType = prepareOutputType(input);
171         if (outputType.equals(NotificationOutputType.JSON)) {
172             streamNameBuilder.append('/').append(outputType.getName());
173         }
174
175         final var streamName = streamNameBuilder.toString();
176
177         // registration of the listener
178         listenersBroker.registerNotificationListener(qnames, streamName, outputType);
179
180         return Builders.containerBuilder()
181             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
182             .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, streamName))
183             .build();
184     }
185
186     /**
187      * Create device notification stream.
188      *
189      * @param baseUrl base Url
190      * @param input RPC input
191      * @param mountPointService dom mount point service
192      * @param listenersBroker {@link ListenersBroker}
193      * @return {@link DOMRpcResult} - Output of RPC - example in JSON
194      */
195     // FIXME: this should be an RPC invocation
196     static ContainerNode createDeviceNotificationListener(final String baseUrl, final ContainerNode input,
197             final SubscribeToStreamUtil streamUtil, final DOMMountPointService mountPointService,
198             final ListenersBroker listenersBroker) {
199         // parsing out of container with settings and path
200         // FIXME: ugly cast
201         final YangInstanceIdentifier path =
202             (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
203                 .map(DataContainerChild::body)
204                 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
205                     ErrorTag.DATA_MISSING));
206
207         if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
208             throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
209                 ErrorTag.INVALID_VALUE);
210         }
211         if (listId.size() != 1) {
212             throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
213                 ErrorTag.INVALID_VALUE);
214         }
215         final String deviceName = listId.values().iterator().next().toString();
216
217         final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
218             .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
219                 ErrorTag.OPERATION_FAILED));
220
221         final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
222             .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
223                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
224
225         final EffectiveModelContext mountModelContext = mountPoint.getService(DOMSchemaService.class)
226             .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
227                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
228             .getGlobalContext();
229         final Set<Absolute> notificationPaths = mountModelContext.getModuleStatements().values().stream()
230             .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
231             .map(notification -> Absolute.of(notification.argument()))
232             .collect(Collectors.toUnmodifiableSet());
233         if (notificationPaths.isEmpty()) {
234             throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
235                 ErrorTag.OPERATION_FAILED);
236         }
237
238         final DeviceNotificationListenerAdaptor notificationListenerAdapter = listenersBroker
239             .registerDeviceNotificationListener(deviceName, prepareOutputType(input), mountModelContext,
240                 mountPointService, mountPoint.getIdentifier());
241         notificationListenerAdapter.listen(mountNotifService, notificationPaths);
242
243         return Builders.containerBuilder()
244             .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
245             .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName + "?"
246                 + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
247             .build();
248     }
249
250     /**
251      * Prepare {@link NotificationOutputType}.
252      *
253      * @param data Container with stream settings (RPC create-stream).
254      * @return Parsed {@link NotificationOutputType}.
255      */
256     private static NotificationOutputType prepareOutputType(final ContainerNode data) {
257         final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
258         return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
259     }
260
261     /**
262      * Prepare stream name.
263      *
264      * @param path          Path of element from which data-change-event notifications are going to be generated.
265      * @param schemaContext Schema context.
266      * @param data          Container with stream settings (RPC create-stream).
267      * @return Parsed stream name.
268      */
269     private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
270             final EffectiveModelContext schemaContext, final ContainerNode data) {
271         final String datastoreName = extractStringLeaf(data, DATASTORE_NODEID);
272         final LogicalDatastoreType datastoreType = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
273             : LogicalDatastoreType.CONFIGURATION;
274
275         final String scopeName = extractStringLeaf(data, SCOPE_NODEID);
276         // FIXME: this is not really used
277         final Scope scope = scopeName != null ? Scope.ofName(scopeName) : Scope.BASE;
278
279         return RestconfStreamsConstants.DATA_SUBSCRIPTION
280             + "/" + ListenersBroker.createStreamNameFromUri(IdentifierCodec.serialize(path, schemaContext)
281                 + "/" + RestconfStreamsConstants.DATASTORE_PARAM_NAME + "=" + datastoreType
282                 + "/" + RestconfStreamsConstants.SCOPE_PARAM_NAME + "=" + scope);
283     }
284
285     /**
286      * Prepare {@link YangInstanceIdentifier} of stream source.
287      *
288      * @param data Container with stream settings (RPC create-stream).
289      * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
290      *         are going to be generated.
291      */
292     private static YangInstanceIdentifier preparePath(final ContainerNode data) {
293         final var pathLeaf = data.childByArg(PATH_NODEID);
294         if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
295             return pathValue;
296         }
297
298         throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
299             ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
300     }
301
302     private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
303         return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
304             ? str : null;
305     }
306 }