2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.collect.ImmutableList;
13 import com.google.common.collect.ImmutableSet;
14 import java.util.Optional;
15 import org.eclipse.jdt.annotation.Nullable;
16 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
17 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
18 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
19 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
20 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
21 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
22 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
23 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
24 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
25 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
26 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
27 import org.opendaylight.yangtools.yang.common.ErrorTag;
28 import org.opendaylight.yangtools.yang.common.ErrorType;
29 import org.opendaylight.yangtools.yang.common.QName;
30 import org.opendaylight.yangtools.yang.common.QNameModule;
31 import org.opendaylight.yangtools.yang.common.Revision;
32 import org.opendaylight.yangtools.yang.common.XMLNamespace;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.AugmentationIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
37 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
38 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
39 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
40 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
41 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
42 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
43 import org.opendaylight.yangtools.yang.model.api.Module;
44 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
45 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
50 * Utility class for creation of data-change-event or YANG notification streams.
52 final class CreateStreamUtil {
53 private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
54 private static final QNameModule SAL_REMOTE_AUGMENT = QNameModule.create(
55 XMLNamespace.of("urn:sal:restconf:event:subscription"), Revision.of("2014-07-08"));
56 private static final QName DATASTORE_QNAME =
57 QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
58 private static final QName SCOPE_QNAME =
59 QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
60 private static final QName OUTPUT_TYPE_QNAME =
61 QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
62 private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
63 private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
64 private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
66 private static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(
67 ImmutableSet.of(SCOPE_QNAME, DATASTORE_QNAME, OUTPUT_TYPE_QNAME));
69 private CreateStreamUtil() {
74 * Create data-change-event or notification stream with POST operation via RPC.
76 * @param payload Input of RPC - example in JSON (data-change-event stream):
81 * "path": "/toaster:toaster/toaster:toasterStatus",
82 * "sal-remote-augment:datastore": "OPERATIONAL",
83 * "sal-remote-augment:scope": "ONE"
88 * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
89 * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
94 * "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
100 static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodePayload payload,
101 final EffectiveModelContext refSchemaCtx) {
102 // parsing out of container with settings and path
103 final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
104 final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
105 final YangInstanceIdentifier path = preparePath(data, qname);
107 // building of stream name
108 final StringBuilder streamNameBuilder = new StringBuilder(
109 prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx), data));
110 final NotificationOutputType outputType = prepareOutputType(data);
111 if (outputType.equals(NotificationOutputType.JSON)) {
112 streamNameBuilder.append('/').append(outputType.getName());
114 final String streamName = streamNameBuilder.toString();
116 // registration of the listener
117 ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
119 // building of output
120 final QName outputQname = QName.create(qname, "output");
121 final QName streamNameQname = QName.create(qname, "stream-name");
123 final ContainerNode output = ImmutableContainerNodeBuilder.create()
124 .withNodeIdentifier(new NodeIdentifier(outputQname))
125 .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
126 return new DefaultDOMRpcResult(output);
130 * Prepare {@link NotificationOutputType}.
132 * @param data Container with stream settings (RPC create-stream).
133 * @return Parsed {@link NotificationOutputType}.
135 private static NotificationOutputType prepareOutputType(final ContainerNode data) {
136 final String outputName = extractStringLeaf(data, OUTPUT_TYPE_NODEID);
137 return outputName != null ? NotificationOutputType.valueOf(outputName) : NotificationOutputType.XML;
141 * Prepare stream name.
143 * @param path Path of element from which data-change-event notifications are going to be generated.
144 * @param schemaContext Schema context.
145 * @param data Container with stream settings (RPC create-stream).
146 * @return Parsed stream name.
148 private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
149 final EffectiveModelContext schemaContext, final ContainerNode data) {
150 final String datastoreName = extractStringLeaf(data, DATASTORE_NODEID);
151 final LogicalDatastoreType datastoreType = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName)
152 : LogicalDatastoreType.CONFIGURATION;
154 final String scopeName = extractStringLeaf(data, SCOPE_NODEID);
155 // FIXME: this is not really used
156 final Scope scope = scopeName != null ? Scope.forName(scopeName).orElseThrow() : Scope.BASE;
158 return RestconfStreamsConstants.DATA_SUBSCRIPTION
159 + "/" + ListenersBroker.createStreamNameFromUri(IdentifierCodec.serialize(path, schemaContext)
160 + "/" + RestconfStreamsConstants.DATASTORE_PARAM_NAME + "=" + datastoreType
161 + "/" + RestconfStreamsConstants.SCOPE_PARAM_NAME + "=" + scope);
165 * Prepare {@link YangInstanceIdentifier} of stream source.
167 * @param data Container with stream settings (RPC create-stream).
168 * @param qualifiedName QName of the input RPC context (used only in debugging).
169 * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
170 * are going to be generated.
172 private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
173 final Object pathValue = data.findChildByArg(new NodeIdentifier(QName.create(qualifiedName, "path")))
174 .map(DataContainerChild::body)
176 if (!(pathValue instanceof YangInstanceIdentifier)) {
177 LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
178 throw new RestconfDocumentedException(
179 "Instance identifier was not normalized correctly",
180 ErrorType.APPLICATION,
181 ErrorTag.OPERATION_FAILED);
183 return (YangInstanceIdentifier) pathValue;
186 private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) {
187 final DataContainerChild augNode = data.childByArg(SAL_REMOTE_AUG_IDENTIFIER);
188 if (augNode instanceof AugmentationNode) {
189 final DataContainerChild enumNode = ((AugmentationNode) augNode).childByArg(childName);
190 if (enumNode instanceof LeafNode) {
191 final Object value = enumNode.body();
192 if (value instanceof String) {
193 return (String) value;
201 * Create YANG notification stream using notification definition in YANG schema.
203 * @param notificationDefinition YANG notification definition.
204 * @param refSchemaCtx Reference to {@link EffectiveModelContext}
205 * @param outputType Output type (XML or JSON).
206 * @return {@link NotificationListenerAdapter}
208 static NotificationListenerAdapter createYangNotifiStream(final NotificationDefinition notificationDefinition,
209 final EffectiveModelContext refSchemaCtx, final NotificationOutputType outputType) {
210 final String streamName = parseNotificationStreamName(requireNonNull(notificationDefinition),
211 requireNonNull(refSchemaCtx), requireNonNull(outputType.getName()));
212 final Optional<NotificationListenerAdapter> listenerForStreamName = ListenersBroker.getInstance()
213 .getNotificationListenerFor(streamName);
214 return listenerForStreamName.orElseGet(() -> ListenersBroker.getInstance().registerNotificationListener(
215 Absolute.of(ImmutableList.copyOf(notificationDefinition.getPath().getPathFromRoot())), streamName,
219 private static String parseNotificationStreamName(final NotificationDefinition notificationDefinition,
220 final EffectiveModelContext refSchemaCtx, final String outputType) {
221 final QName notificationDefinitionQName = notificationDefinition.getQName();
222 final Module module = refSchemaCtx.findModule(
223 notificationDefinitionQName.getModule().getNamespace(),
224 notificationDefinitionQName.getModule().getRevision()).orElse(null);
225 requireNonNull(module, String.format("Module for namespace %s does not exist.",
226 notificationDefinitionQName.getModule().getNamespace()));
228 final StringBuilder streamNameBuilder = new StringBuilder();
229 streamNameBuilder.append(RestconfStreamsConstants.NOTIFICATION_STREAM)
231 .append(module.getName())
233 .append(notificationDefinitionQName.getLocalName());
234 if (outputType.equals(NotificationOutputType.JSON.getName())) {
235 streamNameBuilder.append('/').append(NotificationOutputType.JSON.getName());
237 return streamNameBuilder.toString();