Acquire DOMNotificationService before changing state
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / CreateStreamUtil.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.ImmutableSet;
13 import java.util.Collection;
14 import java.util.Set;
15 import java.util.stream.Collectors;
16 import org.eclipse.jdt.annotation.Nullable;
17 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
18 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
19 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
20 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
21 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
22 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
23 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
24 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
25 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
26 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
27 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
28 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
29 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
30 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
33 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
34 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
35 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
36 import org.opendaylight.yangtools.yang.common.ErrorTag;
37 import org.opendaylight.yangtools.yang.common.ErrorType;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import org.opendaylight.yangtools.yang.common.QNameModule;
40 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.AugmentationIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
45 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
46 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
47 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
48 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
49 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
50 import org.opendaylight.yangtools.yang.model.api.Module;
51 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
52 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Utility class for creation of data-change-event or YANG notification streams.
58  */
59 final class CreateStreamUtil {
60     private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
61     private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
62
63     private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
64     private static final QName DATASTORE_QNAME =
65         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
66     private static final QName SCOPE_QNAME =
67         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
68     private static final QName OUTPUT_TYPE_QNAME =
69         QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
70     private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
71         QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
72     private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
73         QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
74     private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
75     private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
76     private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
77     private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
78         NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
79     private static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(
80         ImmutableSet.of(SCOPE_QNAME, DATASTORE_QNAME, OUTPUT_TYPE_QNAME));
81
82     private CreateStreamUtil() {
83         // Hidden on purpose
84     }
85
86     /**
87      * Create data-change-event or notification stream with POST operation via RPC.
88      *
89      * @param payload      Input of RPC - example in JSON (data-change-event stream):
90      *                     <pre>
91      *                     {@code
92      *                         {
93      *                             "input": {
94      *                                 "path": "/toaster:toaster/toaster:toasterStatus",
95      *                                 "sal-remote-augment:datastore": "OPERATIONAL",
96      *                                 "sal-remote-augment:scope": "ONE"
97      *                             }
98      *                         }
99      *                     }
100      *                     </pre>
101      * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
102      * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
103      *     <pre>
104      *     {@code
105      *         {
106      *             "output": {
107      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
108      *             }
109      *         }
110      *     }
111      *     </pre>
112      */
113     static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodePayload payload,
114             final EffectiveModelContext refSchemaCtx) {
115         // parsing out of container with settings and path
116         final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
117         final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
118         final YangInstanceIdentifier path = preparePath(data, qname);
119
120         // building of stream name
121         final StringBuilder streamNameBuilder = new StringBuilder(
122                 prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx), data));
123         final NotificationOutputType outputType = prepareOutputType(data);
124         if (outputType.equals(NotificationOutputType.JSON)) {
125             streamNameBuilder.append('/').append(outputType.getName());
126         }
127         final String streamName = streamNameBuilder.toString();
128
129         // registration of the listener
130         ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
131
132         // building of output
133         final QName outputQname = QName.create(qname, "output");
134         final QName streamNameQname = QName.create(qname, "stream-name");
135
136         return new DefaultDOMRpcResult(Builders.containerBuilder()
137             .withNodeIdentifier(new NodeIdentifier(outputQname))
138             .withChild(ImmutableNodes.leafNode(streamNameQname, streamName))
139             .build());
140     }
141
142     /**
143      * Create device notification stream.
144      *
145      * @param baseUrl base Url
146      * @param payload data
147      * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
148      * @param streamUtil stream utility
149      * @param mountPointService dom mount point service
150      * @return {@link DOMRpcResult} - Output of RPC - example in JSON
151      */
152     static DOMRpcResult createDeviceNotificationListener(final String baseUrl, final NormalizedNodePayload payload,
153             final EffectiveModelContext refSchemaCtx, final SubscribeToStreamUtil streamUtil,
154             final DOMMountPointService mountPointService) {
155         // parsing out of container with settings and path
156         final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
157         final YangInstanceIdentifier value =
158             (YangInstanceIdentifier) data.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
159             .map(DataContainerChild::body)
160             .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
161                 ErrorTag.OPERATION_FAILED));
162
163         // FIXME: just beautiful: a ClassCastException if it is something different!
164         final String deviceName =
165             ((YangInstanceIdentifier.NodeIdentifierWithPredicates.Singleton)value.getLastPathArgument())
166             .values().getElement().toString();
167
168         final DOMMountPoint mountPoint = mountPointService.getMountPoint(value)
169             .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
170                 ErrorTag.OPERATION_FAILED));
171
172         final DOMNotificationService mountNotifService = mountPoint.getService(DOMNotificationService.class)
173             .orElseThrow(() -> new RestconfDocumentedException("Mount point does not support notifications",
174                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED));
175
176         // FIXME: what is the relationship to the unused refSchemaCtx?
177         final EffectiveModelContext mountModelContext = mountPoint.getService(DOMSchemaService.class)
178             .orElseThrow(() -> new RestconfDocumentedException("Mount point schema not available",
179                 ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED))
180             .getGlobalContext();
181         final Collection<? extends NotificationDefinition> notifications = mountModelContext.getNotifications();
182         if (notifications.isEmpty()) {
183             throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
184                 ErrorTag.OPERATION_FAILED);
185         }
186
187         final Set<Absolute> absolutes = notifications.stream()
188             .map(notificationDefinition -> Absolute.of(notificationDefinition.getQName()))
189             .collect(Collectors.toUnmodifiableSet());
190
191         final DeviceNotificationListenerAdaptor notificationListenerAdapter = ListenersBroker.getInstance()
192             .registerDeviceNotificationListener(deviceName, prepareOutputType(data), mountModelContext,
193                 mountPointService, mountPoint.getIdentifier());
194         notificationListenerAdapter.listen(mountNotifService, absolutes);
195
196         // building of output
197         return new DefaultDOMRpcResult(Builders.containerBuilder()
198             .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
199             .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName
200                 + "?" + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
201             .build());
202     }
203
204     /**
205      * Prepare {@link NotificationOutputType}.
206      *
207      * @param data Container with stream settings (RPC create-stream).
208      * @return Parsed {@link NotificationOutputType}.
209      */
210     private static NotificationOutputType prepareOutputType(final ContainerNode data) {
211         final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
212         return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
213     }
214
215     /**
216      * Prepare stream name.
217      *
218      * @param path          Path of element from which data-change-event notifications are going to be generated.
219      * @param schemaContext Schema context.
220      * @param data          Container with stream settings (RPC create-stream).
221      * @return Parsed stream name.
222      */
223     private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
224             final EffectiveModelContext schemaContext, final ContainerNode data) {
225         final String datastoreName = extractStringLeaf(data, DATASTORE_NODEID);
226         final LogicalDatastoreType datastoreType = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
227             : LogicalDatastoreType.CONFIGURATION;
228
229         final String scopeName = extractStringLeaf(data, SCOPE_NODEID);
230         // FIXME: this is not really used
231         final Scope scope = scopeName != null ? Scope.ofName(scopeName) : Scope.BASE;
232
233         return RestconfStreamsConstants.DATA_SUBSCRIPTION
234             + "/" + ListenersBroker.createStreamNameFromUri(IdentifierCodec.serialize(path, schemaContext)
235                 + "/" + RestconfStreamsConstants.DATASTORE_PARAM_NAME + "=" + datastoreType
236                 + "/" + RestconfStreamsConstants.SCOPE_PARAM_NAME + "=" + scope);
237     }
238
239     /**
240      * Prepare {@link YangInstanceIdentifier} of stream source.
241      *
242      * @param data          Container with stream settings (RPC create-stream).
243      * @param qualifiedName QName of the input RPC context (used only in debugging).
244      * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
245      *     are going to be generated.
246      */
247     private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
248         final Object pathValue = data.findChildByArg(new NodeIdentifier(QName.create(qualifiedName, "path")))
249             .map(DataContainerChild::body)
250             .orElse(null);
251         if (!(pathValue instanceof YangInstanceIdentifier)) {
252             LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
253             throw new RestconfDocumentedException(
254                     "Instance identifier was not normalized correctly",
255                     ErrorType.APPLICATION,
256                     ErrorTag.OPERATION_FAILED);
257         }
258         return (YangInstanceIdentifier) pathValue;
259     }
260
261     private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
262         final DataContainerChild augNode = data.childByArg(SAL_REMOTE_AUG_IDENTIFIER);
263         if (augNode instanceof AugmentationNode) {
264             final DataContainerChild enumNode = ((AugmentationNode) augNode).childByArg(childName);
265             if (enumNode instanceof LeafNode) {
266                 final Object value = enumNode.body();
267                 if (value instanceof String) {
268                     return (String) value;
269                 }
270             }
271         }
272         return null;
273     }
274
275     /**
276      * Create YANG notification stream using notification definition in YANG schema.
277      *
278      * @param notificationDefinition YANG notification definition.
279      * @param refSchemaCtx           Reference to {@link EffectiveModelContext}
280      * @param outputType             Output type (XML or JSON).
281      * @return {@link NotificationListenerAdapter}
282      */
283     static NotificationListenerAdapter createYangNotifiStream(final NotificationDefinition notificationDefinition,
284             final EffectiveModelContext refSchemaCtx, final NotificationOutputType outputType) {
285         final var streamName = parseNotificationStreamName(requireNonNull(notificationDefinition),
286                 requireNonNull(refSchemaCtx), requireNonNull(outputType.getName()));
287         final var listenersBroker = ListenersBroker.getInstance();
288
289         final var existing = listenersBroker.notificationListenerFor(streamName);
290         return existing != null ? existing
291             : listenersBroker.registerNotificationListener(
292                 Absolute.of(notificationDefinition.getQName()), streamName, outputType);
293     }
294
295     private static String parseNotificationStreamName(final NotificationDefinition notificationDefinition,
296             final EffectiveModelContext refSchemaCtx, final String outputType) {
297         final QName notificationDefinitionQName = notificationDefinition.getQName();
298         final Module module = refSchemaCtx.findModule(
299                 notificationDefinitionQName.getModule().getNamespace(),
300                 notificationDefinitionQName.getModule().getRevision()).orElse(null);
301         requireNonNull(module, String.format("Module for namespace %s does not exist.",
302                 notificationDefinitionQName.getModule().getNamespace()));
303
304         final StringBuilder streamNameBuilder = new StringBuilder();
305         streamNameBuilder.append(RestconfStreamsConstants.NOTIFICATION_STREAM)
306                 .append('/')
307                 .append(module.getName())
308                 .append(':')
309                 .append(notificationDefinitionQName.getLocalName());
310         if (outputType.equals(NotificationOutputType.JSON.getName())) {
311             streamNameBuilder.append('/').append(NotificationOutputType.JSON.getName());
312         }
313         return streamNameBuilder.toString();
314     }
315 }