2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.collect.ImmutableSet;
14 import java.util.stream.Collectors;
15 import org.eclipse.jdt.annotation.Nullable;
16 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
17 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
18 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
19 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
20 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
21 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
22 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
23 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
24 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
25 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
26 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput;
32 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
33 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
34 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
35 import org.opendaylight.yangtools.yang.common.ErrorTag;
36 import org.opendaylight.yangtools.yang.common.ErrorType;
37 import org.opendaylight.yangtools.yang.common.QName;
38 import org.opendaylight.yangtools.yang.common.QNameModule;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
42 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
44 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
46 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
47 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
50 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
51 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
54 * Utility class for creation of data-change-event or YANG notification streams.
56 final class CreateStreamUtil {
57 private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
59 private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
60 private static final QName DATASTORE_QNAME =
61 QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
62 private static final QName SCOPE_QNAME =
63 QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
64 private static final QName OUTPUT_TYPE_QNAME =
65 QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
66 private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
67 QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
68 private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
69 QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
70 private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
71 private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
72 private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
73 private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
74 NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
75 private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID =
76 NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME);
77 private static final NodeIdentifier NOTIFICATIONS =
78 NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern());
79 private static final NodeIdentifier PATH_NODEID =
80 NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern());
81 private static final NodeIdentifier STREAM_NAME_NODEID =
82 NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
84 private CreateStreamUtil() {
89 * Create data-change-event stream with POST operation via RPC.
91 * @param input Input of RPC - example in JSON (data-change-event stream):
96 * "path": "/toaster:toaster/toaster:toasterStatus",
97 * "sal-remote-augment:datastore": "OPERATIONAL",
98 * "sal-remote-augment:scope": "ONE"
103 * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
104 * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
109 * "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
115 // FIXME: this really should be a normal RPC implementation
116 static ContainerNode createDataChangeNotifiStream(final ListenersBroker listenersBroker, final ContainerNode input,
117 final EffectiveModelContext refSchemaCtx) {
118 // parsing out of container with settings and path
119 final YangInstanceIdentifier path = preparePath(input);
121 // building of stream name
122 final StringBuilder streamNameBuilder = new StringBuilder(
123 prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx), input));
124 final NotificationOutputType outputType = prepareOutputType(input);
125 if (outputType.equals(NotificationOutputType.JSON)) {
126 streamNameBuilder.append('/').append(outputType.getName());
128 final String streamName = streamNameBuilder.toString();
130 // registration of the listener
131 listenersBroker.registerDataChangeListener(path, streamName, outputType);
133 // building of output
134 return Builders.containerBuilder()
135 .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
136 .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, streamName))
140 // FIXME: this really should be a normal RPC implementation
141 static ContainerNode createNotificationStream(final ListenersBroker listenersBroker, final ContainerNode input,
142 final EffectiveModelContext refSchemaCtx) {
143 final var qnames = ((LeafSetNode<String>) input.getChildByArg(NOTIFICATIONS)).body().stream()
144 .map(LeafSetEntryNode::body)
147 .collect(ImmutableSet.toImmutableSet());
149 final var streamNameBuilder = new StringBuilder(RestconfStreamsConstants.NOTIFICATION_STREAM).append('/');
150 var haveFirst = false;
151 for (var qname : qnames) {
152 final var module = refSchemaCtx.findModuleStatement(qname.getModule())
153 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module",
154 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
155 final var stmt = module.findSchemaTreeNode(qname)
156 .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an notification",
157 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE));
158 if (!(stmt instanceof NotificationEffectiveStatement)) {
159 throw new RestconfDocumentedException(qname + " refers to a non-notification",
160 ErrorType.APPLICATION, ErrorTag.INVALID_VALUE);
164 streamNameBuilder.append(',');
168 streamNameBuilder.append(module.argument().getLocalName()).append(':').append(qname.getLocalName());
170 final var outputType = prepareOutputType(input);
171 if (outputType.equals(NotificationOutputType.JSON)) {
172 streamNameBuilder.append('/').append(outputType.getName());
175 final var streamName = streamNameBuilder.toString();
177 // registration of the listener
178 listenersBroker.registerNotificationListener(qnames, streamName, outputType);
180 return Builders.containerBuilder()
181 .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
182 .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, streamName))
187 * Create device notification stream.
189 * @param baseUrl base Url
190 * @param input RPC input
191 * @param mountPointService dom mount point service
192 * @param listenersBroker {@link ListenersBroker}
193 * @return {@link DOMRpcResult} - Output of RPC - example in JSON
195 // FIXME: this should be an RPC invocation
196 static ContainerNode createDeviceNotificationListener(final String baseUrl, final ContainerNode input,
197 final SubscribeToStreamUtil streamUtil, final DOMMountPointService mountPointService,
198 final ListenersBroker listenersBroker) {
199 // parsing out of container with settings and path
201 final YangInstanceIdentifier path =
202 (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
203 .map(DataContainerChild::body)
204 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
205 ErrorTag.DATA_MISSING));
207 if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
208 throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
209 ErrorTag.INVALID_VALUE);
211 if (listId.size() != 1) {
212 throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
213 ErrorTag.INVALID_VALUE);
215 final String deviceName = listId.values().iterator().next().toString();
217 final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
218 .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
219 ErrorTag.OPERATION_FAILED));
221 final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
222 .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
223 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
225 final EffectiveModelContext mountModelContext = mountPoint.getService(DOMSchemaService.class)
226 .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
227 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
229 final Set<Absolute> notificationPaths = mountModelContext.getModuleStatements().values().stream()
230 .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
231 .map(notification -> Absolute.of(notification.argument()))
232 .collect(Collectors.toUnmodifiableSet());
233 if (notificationPaths.isEmpty()) {
234 throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
235 ErrorTag.OPERATION_FAILED);
238 final DeviceNotificationListenerAdaptor notificationListenerAdapter = listenersBroker
239 .registerDeviceNotificationListener(deviceName, prepareOutputType(input), mountModelContext,
240 mountPointService, mountPoint.getIdentifier());
241 notificationListenerAdapter.listen(mountNotifService, notificationPaths);
243 return Builders.containerBuilder()
244 .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
245 .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName + "?"
246 + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
251 * Prepare {@link NotificationOutputType}.
253 * @param data Container with stream settings (RPC create-stream).
254 * @return Parsed {@link NotificationOutputType}.
256 private static NotificationOutputType prepareOutputType(final ContainerNode data) {
257 final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
258 return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
262 * Prepare stream name.
264 * @param path Path of element from which data-change-event notifications are going to be generated.
265 * @param schemaContext Schema context.
266 * @param data Container with stream settings (RPC create-stream).
267 * @return Parsed stream name.
269 private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
270 final EffectiveModelContext schemaContext, final ContainerNode data) {
271 final String datastoreName = extractStringLeaf(data, DATASTORE_NODEID);
272 final LogicalDatastoreType datastoreType = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
273 : LogicalDatastoreType.CONFIGURATION;
275 final String scopeName = extractStringLeaf(data, SCOPE_NODEID);
276 // FIXME: this is not really used
277 final Scope scope = scopeName != null ? Scope.ofName(scopeName) : Scope.BASE;
279 return RestconfStreamsConstants.DATA_SUBSCRIPTION
280 + "/" + ListenersBroker.createStreamNameFromUri(IdentifierCodec.serialize(path, schemaContext)
281 + "/" + RestconfStreamsConstants.DATASTORE_PARAM_NAME + "=" + datastoreType
282 + "/" + RestconfStreamsConstants.SCOPE_PARAM_NAME + "=" + scope);
286 * Prepare {@link YangInstanceIdentifier} of stream source.
288 * @param data Container with stream settings (RPC create-stream).
289 * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
290 * are going to be generated.
292 private static YangInstanceIdentifier preparePath(final ContainerNode data) {
293 final var pathLeaf = data.childByArg(PATH_NODEID);
294 if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) {
298 throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
299 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
302 private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
303 return data.childByArg(childName) instanceof LeafNode<?> leafNode && leafNode.body() instanceof String str