Capture ListenersBroker instances
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / CreateStreamUtil.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import java.util.Set;
13 import java.util.stream.Collectors;
14 import org.eclipse.jdt.annotation.Nullable;
15 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
16 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
17 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
18 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
19 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
20 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
21 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
22 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
23 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
24 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
25 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
26 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
31 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
32 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
33 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
34 import org.opendaylight.yangtools.yang.common.ErrorTag;
35 import org.opendaylight.yangtools.yang.common.ErrorType;
36 import org.opendaylight.yangtools.yang.common.QName;
37 import org.opendaylight.yangtools.yang.common.QNameModule;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
41 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
43 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
44 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
47 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
48 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
49
50 /**
51  * Utility class for creation of data-change-event or YANG notification streams.
52  */
53 final class CreateStreamUtil {
54     private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
55
56     private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
57     private static final QName DATASTORE_QNAME =
58         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
59     private static final QName SCOPE_QNAME =
60         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
61     private static final QName OUTPUT_TYPE_QNAME =
62         QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
63     private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
64         QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
65     private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
66         QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
67     private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
68     private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
69     private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
70     private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
71         NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
72     private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
73         NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
74     private static final NodeIdentifier PATH_NODEID =
75         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
76     private static final NodeIdentifier STREAM_NAME_NODEID =
77         NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
78
79     private CreateStreamUtil() {
80         // Hidden on purpose
81     }
82
83     /**
84      * Create data-change-event stream with POST operation via RPC.
85      *
86      * @param input Input of RPC - example in JSON (data-change-event stream):
87      *              <pre>
88      *              {@code
89      *                  {
90      *                      "input": {
91      *                          "path": "/toaster:toaster/toaster:toasterStatus",
92      *                          "sal-remote-augment:datastore": "OPERATIONAL",
93      *                          "sal-remote-augment:scope": "ONE"
94      *                      }
95      *                  }
96      *              }
97      *              </pre>
98      * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
99      * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
100      *     <pre>
101      *     {@code
102      *         {
103      *             "output": {
104      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
105      *             }
106      *         }
107      *     }
108      *     </pre>
109      */
110     // FIXME: this really should be a normal RPC implementation
111     static DOMRpcResult createDataChangeNotifiStream(final ListenersBroker listenersBroker, final ContainerNode input,
112             final EffectiveModelContext refSchemaCtx) {
113         // parsing out of container with settings and path
114         final YangInstanceIdentifier path = preparePath(input);
115
116         // building of stream name
117         final StringBuilder streamNameBuilder = new StringBuilder(
118                 prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx), input));
119         final NotificationOutputType outputType = prepareOutputType(input);
120         if (outputType.equals(NotificationOutputType.JSON)) {
121             streamNameBuilder.append('/').append(outputType.getName());
122         }
123         final String streamName = streamNameBuilder.toString();
124
125         // registration of the listener
126         listenersBroker.registerDataChangeListener(path, streamName, outputType);
127
128         // building of output
129         return new DefaultDOMRpcResult(Builders.containerBuilder()
130             .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
131             .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, streamName))
132             .build());
133     }
134
135     /**
136      * Create device notification stream.
137      *
138      * @param baseUrl base Url
139      * @param input RPC input
140      * @param streamUtil stream utility
141      * @param mountPointService dom mount point service
142      * @return {@link DOMRpcResult} - Output of RPC - example in JSON
143      */
144     static DOMRpcResult createDeviceNotificationListener(final String baseUrl, final ContainerNode input,
145             final SubscribeToStreamUtil streamUtil, final DOMMountPointService mountPointService) {
146         // parsing out of container with settings and path
147         // FIXME: ugly cast
148         final YangInstanceIdentifier path =
149             (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
150                 .map(DataContainerChild::body)
151                 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
152                     ErrorTag.DATA_MISSING));
153
154         if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
155             throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
156                 ErrorTag.INVALID_VALUE);
157         }
158         if (listId.size() != 1) {
159             throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
160                 ErrorTag.INVALID_VALUE);
161         }
162         final String deviceName = listId.values().iterator().next().toString();
163
164         final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
165             .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
166                 ErrorTag.OPERATION_FAILED));
167
168         final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
169             .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
170                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
171
172         final EffectiveModelContext mountModelContext = mountPoint.getService(DOMSchemaService.class)
173             .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
174                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
175             .getGlobalContext();
176         final Set<Absolute> notificationPaths = mountModelContext.getModuleStatements().values().stream()
177             .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
178             .map(notification -> Absolute.of(notification.argument()))
179             .collect(Collectors.toUnmodifiableSet());
180         if (notificationPaths.isEmpty()) {
181             throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
182                 ErrorTag.OPERATION_FAILED);
183         }
184
185         final DeviceNotificationListenerAdaptor notificationListenerAdapter = streamUtil.listenersBroker()
186             .registerDeviceNotificationListener(deviceName, prepareOutputType(input), mountModelContext,
187                 mountPointService, mountPoint.getIdentifier());
188         notificationListenerAdapter.listen(mountNotifService, notificationPaths);
189
190         // building of output
191         return new DefaultDOMRpcResult(Builders.containerBuilder()
192             .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
193             .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName
194                 + "?" + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
195             .build());
196     }
197
198     /**
199      * Prepare {@link NotificationOutputType}.
200      *
201      * @param data Container with stream settings (RPC create-stream).
202      * @return Parsed {@link NotificationOutputType}.
203      */
204     private static NotificationOutputType prepareOutputType(final ContainerNode data) {
205         final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
206         return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
207     }
208
209     /**
210      * Prepare stream name.
211      *
212      * @param path          Path of element from which data-change-event notifications are going to be generated.
213      * @param schemaContext Schema context.
214      * @param data          Container with stream settings (RPC create-stream).
215      * @return Parsed stream name.
216      */
217     private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
218             final EffectiveModelContext schemaContext, final ContainerNode data) {
219         final String datastoreName = extractStringLeaf(data, DATASTORE_NODEID);
220         final LogicalDatastoreType datastoreType = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
221             : LogicalDatastoreType.CONFIGURATION;
222
223         final String scopeName = extractStringLeaf(data, SCOPE_NODEID);
224         // FIXME: this is not really used
225         final Scope scope = scopeName != null ? Scope.ofName(scopeName) : Scope.BASE;
226
227         return RestconfStreamsConstants.DATA_SUBSCRIPTION
228             + "/" + ListenersBroker.createStreamNameFromUri(IdentifierCodec.serialize(path, schemaContext)
229                 + "/" + RestconfStreamsConstants.DATASTORE_PARAM_NAME + "=" + datastoreType
230                 + "/" + RestconfStreamsConstants.SCOPE_PARAM_NAME + "=" + scope);
231     }
232
233     /**
234      * Prepare {@link YangInstanceIdentifier} of stream source.
235      *
236      * @param data Container with stream settings (RPC create-stream).
237      * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
238      *         are going to be generated.
239      */
240     private static YangInstanceIdentifier preparePath(final ContainerNode data) {
241         final var pathLeaf = data.childByArg(PATH_NODEID);
242         if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
243             return pathValue;
244         }
245
246         throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
247             ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
248     }
249
250     private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
251         return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str
252             ? str : null;
253     }
254 }