d3dc392da0c27aad23dfaab7fb39f07200b441e6
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / CreateStreamUtil.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.collect.ImmutableList;
13 import java.util.Optional;
14 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
15 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
16 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
17 import org.opendaylight.restconf.common.context.NormalizedNodeContext;
18 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
19 import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
20 import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
21 import org.opendaylight.restconf.common.util.DataChangeScope;
22 import org.opendaylight.restconf.nb.rfc8040.rests.utils.ResolveEnumUtil;
23 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
24 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
25 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
26 import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
27 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
28 import org.opendaylight.yangtools.yang.common.QName;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
32 import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
33 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
34 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
35 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
36 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
37 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
38 import org.opendaylight.yangtools.yang.model.api.Module;
39 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
40 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Utility class for creation of data-change-event or YANG notification streams.
46  */
47 final class CreateStreamUtil {
48     private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
49
50     private CreateStreamUtil() {
51         throw new UnsupportedOperationException("Utility class");
52     }
53
54     /**
55      * Create data-change-event or notification stream with POST operation via RPC.
56      *
57      * @param payload      Input of RPC - example in JSON (data-change-event stream):
58      *                     <pre>
59      *                     {@code
60      *                         {
61      *                             "input": {
62      *                                 "path": "/toaster:toaster/toaster:toasterStatus",
63      *                                 "sal-remote-augment:datastore": "OPERATIONAL",
64      *                                 "sal-remote-augment:scope": "ONE"
65      *                             }
66      *                         }
67      *                     }
68      *                     </pre>
69      * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
70      * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
71      *     <pre>
72      *     {@code
73      *         {
74      *             "output": {
75      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
76      *             }
77      *         }
78      *     }
79      *     </pre>
80      */
81     static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodeContext payload,
82             final EffectiveModelContext refSchemaCtx) {
83         // parsing out of container with settings and path
84         final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
85         final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
86         final YangInstanceIdentifier path = preparePath(data, qname);
87
88         // building of stream name
89         final StringBuilder streamNameBuilder = new StringBuilder(
90                 prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx), data));
91         final NotificationOutputType outputType = prepareOutputType(data);
92         if (outputType.equals(NotificationOutputType.JSON)) {
93             streamNameBuilder.append('/').append(outputType.getName());
94         }
95         final String streamName = streamNameBuilder.toString();
96
97         // registration of the listener
98         ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
99
100         // building of output
101         final QName outputQname = QName.create(qname, "output");
102         final QName streamNameQname = QName.create(qname, "stream-name");
103
104         final ContainerNode output = ImmutableContainerNodeBuilder.create()
105                 .withNodeIdentifier(new NodeIdentifier(outputQname))
106                 .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
107         return new DefaultDOMRpcResult(output);
108     }
109
110     /**
111      * Prepare {@link NotificationOutputType}.
112      *
113      * @param data Container with stream settings (RPC create-stream).
114      * @return Parsed {@link NotificationOutputType}.
115      */
116     private static NotificationOutputType prepareOutputType(final ContainerNode data) {
117         NotificationOutputType outputType = parseEnum(
118                 data, NotificationOutputType.class, RestconfStreamsConstants.OUTPUT_TYPE_PARAM_NAME);
119         return outputType == null ? NotificationOutputType.XML : outputType;
120     }
121
122     /**
123      * Prepare stream name.
124      *
125      * @param path          Path of element from which data-change-event notifications are going to be generated.
126      * @param schemaContext Schema context.
127      * @param data          Container with stream settings (RPC create-stream).
128      * @return Parsed stream name.
129      */
130     private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
131             final EffectiveModelContext schemaContext, final ContainerNode data) {
132         LogicalDatastoreType datastoreType = parseEnum(
133                 data, LogicalDatastoreType.class, RestconfStreamsConstants.DATASTORE_PARAM_NAME);
134         datastoreType = datastoreType == null ? LogicalDatastoreType.CONFIGURATION : datastoreType;
135
136         DataChangeScope scope = parseEnum(data, DataChangeScope.class, RestconfStreamsConstants.SCOPE_PARAM_NAME);
137         scope = scope == null ? DataChangeScope.BASE : scope;
138
139         return RestconfStreamsConstants.DATA_SUBSCRIPTION
140                 + "/"
141                 + ListenersBroker.createStreamNameFromUri(
142                 ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
143                         + RestconfStreamsConstants.DS_URI
144                         + datastoreType
145                         + RestconfStreamsConstants.SCOPE_URI
146                         + scope);
147     }
148
149     /**
150      * Prepare {@link YangInstanceIdentifier} of stream source.
151      *
152      * @param data          Container with stream settings (RPC create-stream).
153      * @param qualifiedName QName of the input RPC context (used only in debugging).
154      * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
155      *     are going to be generated.
156      */
157     private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
158         final Optional<DataContainerChild<? extends PathArgument, ?>> path = data.getChild(
159                 new YangInstanceIdentifier.NodeIdentifier(QName.create(
160                         qualifiedName,
161                         RestconfStreamsConstants.STREAM_PATH_PARAM_NAME)));
162         Object pathValue = null;
163         if (path.isPresent()) {
164             pathValue = path.get().getValue();
165         }
166         if (!(pathValue instanceof YangInstanceIdentifier)) {
167             LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
168             throw new RestconfDocumentedException(
169                     "Instance identifier was not normalized correctly",
170                     ErrorType.APPLICATION,
171                     ErrorTag.OPERATION_FAILED);
172         }
173         return (YangInstanceIdentifier) pathValue;
174     }
175
176     /**
177      * Parsing out of enumeration from RPC create-stream body.
178      *
179      * @param data      Container with stream settings (RPC create-stream).
180      * @param clazz     Enum type to be parsed out from input container.
181      * @param paramName Local name of the enum element.
182      * @return Parsed enumeration.
183      */
184     private static <T> T parseEnum(final ContainerNode data, final Class<T> clazz, final String paramName) {
185         final Optional<DataContainerChild<? extends PathArgument, ?>> optAugNode = data.getChild(
186                 RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER);
187         if (!optAugNode.isPresent()) {
188             return null;
189         }
190         final DataContainerChild<? extends PathArgument, ?> augNode = optAugNode.get();
191         if (!(augNode instanceof AugmentationNode)) {
192             return null;
193         }
194         final Optional<DataContainerChild<? extends PathArgument, ?>> enumNode = ((AugmentationNode) augNode).getChild(
195                 new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName)));
196         if (!enumNode.isPresent()) {
197             return null;
198         }
199         final Object value = enumNode.get().getValue();
200         if (!(value instanceof String)) {
201             return null;
202         }
203
204         return ResolveEnumUtil.resolveEnum(clazz, (String) value);
205     }
206
207     /**
208      * Create YANG notification stream using notification definition in YANG schema.
209      *
210      * @param notificationDefinition YANG notification definition.
211      * @param refSchemaCtx           Reference to {@link EffectiveModelContext}
212      * @param outputType             Output type (XML or JSON).
213      * @return {@link NotificationListenerAdapter}
214      */
215     static NotificationListenerAdapter createYangNotifiStream(final NotificationDefinition notificationDefinition,
216             final EffectiveModelContext refSchemaCtx, final NotificationOutputType outputType) {
217         final String streamName = parseNotificationStreamName(requireNonNull(notificationDefinition),
218                 requireNonNull(refSchemaCtx), requireNonNull(outputType.getName()));
219         final Optional<NotificationListenerAdapter> listenerForStreamName = ListenersBroker.getInstance()
220                 .getNotificationListenerFor(streamName);
221         return listenerForStreamName.orElseGet(() -> ListenersBroker.getInstance().registerNotificationListener(
222                 Absolute.of(ImmutableList.copyOf(notificationDefinition.getPath().getPathFromRoot())), streamName,
223                 outputType));
224     }
225
226     private static String parseNotificationStreamName(final NotificationDefinition notificationDefinition,
227             final EffectiveModelContext refSchemaCtx, final String outputType) {
228         final QName notificationDefinitionQName = notificationDefinition.getQName();
229         final Module module = refSchemaCtx.findModule(
230                 notificationDefinitionQName.getModule().getNamespace(),
231                 notificationDefinitionQName.getModule().getRevision()).orElse(null);
232         requireNonNull(module, String.format("Module for namespace %s does not exist.",
233                 notificationDefinitionQName.getModule().getNamespace()));
234
235         final StringBuilder streamNameBuilder = new StringBuilder();
236         streamNameBuilder.append(RestconfStreamsConstants.NOTIFICATION_STREAM)
237                 .append('/')
238                 .append(module.getName())
239                 .append(':')
240                 .append(notificationDefinitionQName.getLocalName());
241         if (outputType.equals(NotificationOutputType.JSON.getName())) {
242             streamNameBuilder.append('/').append(NotificationOutputType.JSON.getName());
243         }
244         return streamNameBuilder.toString();
245     }
246 }