2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.collect.ImmutableSet;
14 import java.util.stream.Collectors;
15 import org.eclipse.jdt.annotation.Nullable;
16 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
17 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
18 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
19 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
20 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
21 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
22 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
23 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
24 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
25 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
26 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
27 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
28 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
29 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
32 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
33 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
34 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
35 import org.opendaylight.yangtools.yang.common.ErrorTag;
36 import org.opendaylight.yangtools.yang.common.ErrorType;
37 import org.opendaylight.yangtools.yang.common.QName;
38 import org.opendaylight.yangtools.yang.common.QNameModule;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.AugmentationIdentifier;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
43 import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
46 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
47 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
50 import org.opendaylight.yangtools.yang.model.api.Module;
51 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
52 import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement;
53 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
58 * Utility class for creation of data-change-event or YANG notification streams.
60 final class CreateStreamUtil {
61 private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
62 private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
64 private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
65 private static final QName DATASTORE_QNAME =
66 QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
67 private static final QName SCOPE_QNAME =
68 QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
69 private static final QName OUTPUT_TYPE_QNAME =
70 QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
71 private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
72 QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
73 private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
74 QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
75 private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
76 private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
77 private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
78 private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
79 NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
80 private static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(
81 ImmutableSet.of(SCOPE_QNAME, DATASTORE_QNAME, OUTPUT_TYPE_QNAME));
83 private CreateStreamUtil() {
88 * Create data-change-event or notification stream with POST operation via RPC.
90 * @param payload Input of RPC - example in JSON (data-change-event stream):
95 * "path": "/toaster:toaster/toaster:toasterStatus",
96 * "sal-remote-augment:datastore": "OPERATIONAL",
97 * "sal-remote-augment:scope": "ONE"
102 * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
103 * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
108 * "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
114 static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodePayload payload,
115 final EffectiveModelContext refSchemaCtx) {
116 // parsing out of container with settings and path
117 final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
118 final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
119 final YangInstanceIdentifier path = preparePath(data, qname);
121 // building of stream name
122 final StringBuilder streamNameBuilder = new StringBuilder(
123 prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx), data));
124 final NotificationOutputType outputType = prepareOutputType(data);
125 if (outputType.equals(NotificationOutputType.JSON)) {
126 streamNameBuilder.append('/').append(outputType.getName());
128 final String streamName = streamNameBuilder.toString();
130 // registration of the listener
131 ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
133 // building of output
134 final QName outputQname = QName.create(qname, "output");
135 final QName streamNameQname = QName.create(qname, "stream-name");
137 return new DefaultDOMRpcResult(Builders.containerBuilder()
138 .withNodeIdentifier(new NodeIdentifier(outputQname))
139 .withChild(ImmutableNodes.leafNode(streamNameQname, streamName))
144 * Create device notification stream.
146 * @param baseUrl base Url
147 * @param payload data
148 * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
149 * @param streamUtil stream utility
150 * @param mountPointService dom mount point service
151 * @return {@link DOMRpcResult} - Output of RPC - example in JSON
153 static DOMRpcResult createDeviceNotificationListener(final String baseUrl, final NormalizedNodePayload payload,
154 final EffectiveModelContext refSchemaCtx, final SubscribeToStreamUtil streamUtil,
155 final DOMMountPointService mountPointService) {
156 // parsing out of container with settings and path
158 final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
160 final YangInstanceIdentifier path =
161 (YangInstanceIdentifier) data.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
162 .map(DataContainerChild::body)
163 .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION,
164 ErrorTag.DATA_MISSING));
166 if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) {
167 throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION,
168 ErrorTag.INVALID_VALUE);
170 if (listId.size() != 1) {
171 throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION,
172 ErrorTag.INVALID_VALUE);
174 final String deviceName = listId.values().iterator().next().toString();
176 final DOMMountPoint mountPoint = mountPointService.getMountPoint(path)
177 .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
178 ErrorTag.OPERATION_FAILED));
180 final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
181 .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
182 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
184 // FIXME: what is the relationship to the unused refSchemaCtx?
185 final EffectiveModelContext mountModelContext = mountPoint.getService(DOMSchemaService.class)
186 .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
187 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
189 final Set<Absolute> notificationPaths = mountModelContext.getModuleStatements().values().stream()
190 .flatMap(module -> module.streamEffectiveSubstatements(NotificationEffectiveStatement.class))
191 .map(notification -> Absolute.of(notification.argument()))
192 .collect(Collectors.toUnmodifiableSet());
193 if (notificationPaths.isEmpty()) {
194 throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
195 ErrorTag.OPERATION_FAILED);
198 final DeviceNotificationListenerAdaptor notificationListenerAdapter = ListenersBroker.getInstance()
199 .registerDeviceNotificationListener(deviceName, prepareOutputType(data), mountModelContext,
200 mountPointService, mountPoint.getIdentifier());
201 notificationListenerAdapter.listen(mountNotifService, notificationPaths);
203 // building of output
204 return new DefaultDOMRpcResult(Builders.containerBuilder()
205 .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
206 .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName
207 + "?" + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
212 * Prepare {@link NotificationOutputType}.
214 * @param data Container with stream settings (RPC create-stream).
215 * @return Parsed {@link NotificationOutputType}.
217 private static NotificationOutputType prepareOutputType(final ContainerNode data) {
218 final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
219 return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
223 * Prepare stream name.
225 * @param path Path of element from which data-change-event notifications are going to be generated.
226 * @param schemaContext Schema context.
227 * @param data Container with stream settings (RPC create-stream).
228 * @return Parsed stream name.
230 private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
231 final EffectiveModelContext schemaContext, final ContainerNode data) {
232 final String datastoreName = extractStringLeaf(data, DATASTORE_NODEID);
233 final LogicalDatastoreType datastoreType = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
234 : LogicalDatastoreType.CONFIGURATION;
236 final String scopeName = extractStringLeaf(data, SCOPE_NODEID);
237 // FIXME: this is not really used
238 final Scope scope = scopeName != null ? Scope.ofName(scopeName) : Scope.BASE;
240 return RestconfStreamsConstants.DATA_SUBSCRIPTION
241 + "/" + ListenersBroker.createStreamNameFromUri(IdentifierCodec.serialize(path, schemaContext)
242 + "/" + RestconfStreamsConstants.DATASTORE_PARAM_NAME + "=" + datastoreType
243 + "/" + RestconfStreamsConstants.SCOPE_PARAM_NAME + "=" + scope);
247 * Prepare {@link YangInstanceIdentifier} of stream source.
249 * @param data Container with stream settings (RPC create-stream).
250 * @param qualifiedName QName of the input RPC context (used only in debugging).
251 * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
252 * are going to be generated.
254 private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
255 final Object pathValue = data.findChildByArg(new NodeIdentifier(QName.create(qualifiedName, "path")))
256 .map(DataContainerChild::body)
258 if (!(pathValue instanceof YangInstanceIdentifier)) {
259 LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
260 throw new RestconfDocumentedException(
261 "Instance identifier was not normalized correctly",
262 ErrorType.APPLICATION,
263 ErrorTag.OPERATION_FAILED);
265 return (YangInstanceIdentifier) pathValue;
268 private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
269 final DataContainerChild augNode = data.childByArg(SAL_REMOTE_AUG_IDENTIFIER);
270 if (augNode instanceof AugmentationNode) {
271 final DataContainerChild enumNode = ((AugmentationNode) augNode).childByArg(childName);
272 if (enumNode instanceof LeafNode) {
273 final Object value = enumNode.body();
274 if (value instanceof String) {
275 return (String) value;
283 * Create YANG notification stream using notification definition in YANG schema.
285 * @param notificationDefinition YANG notification definition.
286 * @param refSchemaCtx Reference to {@link EffectiveModelContext}
287 * @param outputType Output type (XML or JSON).
288 * @return {@link NotificationListenerAdapter}
290 static NotificationListenerAdapter createYangNotifiStream(final NotificationDefinition notificationDefinition,
291 final EffectiveModelContext refSchemaCtx, final NotificationOutputType outputType) {
292 final var streamName = parseNotificationStreamName(requireNonNull(notificationDefinition),
293 requireNonNull(refSchemaCtx), requireNonNull(outputType.getName()));
294 final var listenersBroker = ListenersBroker.getInstance();
296 final var existing = listenersBroker.notificationListenerFor(streamName);
297 return existing != null ? existing
298 : listenersBroker.registerNotificationListener(
299 Absolute.of(notificationDefinition.getQName()), streamName, outputType);
302 private static String parseNotificationStreamName(final NotificationDefinition notificationDefinition,
303 final EffectiveModelContext refSchemaCtx, final String outputType) {
304 final QName notificationDefinitionQName = notificationDefinition.getQName();
305 final Module module = refSchemaCtx.findModule(
306 notificationDefinitionQName.getModule().getNamespace(),
307 notificationDefinitionQName.getModule().getRevision()).orElse(null);
308 requireNonNull(module, String.format("Module for namespace %s does not exist.",
309 notificationDefinitionQName.getModule().getNamespace()));
311 final StringBuilder streamNameBuilder = new StringBuilder();
312 streamNameBuilder.append(RestconfStreamsConstants.NOTIFICATION_STREAM)
314 .append(module.getName())
316 .append(notificationDefinitionQName.getLocalName());
317 if (outputType.equals(NotificationOutputType.JSON.getName())) {
318 streamNameBuilder.append('/').append(NotificationOutputType.JSON.getName());
320 return streamNameBuilder.toString();