2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.collect.ImmutableList;
13 import java.util.Optional;
14 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
15 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
16 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
17 import org.opendaylight.restconf.common.context.NormalizedNodeContext;
18 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
19 import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
20 import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
21 import org.opendaylight.restconf.common.util.DataChangeScope;
22 import org.opendaylight.restconf.nb.rfc8040.rests.utils.ResolveEnumUtil;
23 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
24 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
25 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
26 import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
27 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
28 import org.opendaylight.yangtools.yang.common.QName;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
32 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
33 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
34 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
35 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
36 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
37 import org.opendaylight.yangtools.yang.model.api.Module;
38 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
39 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 * Utility class for creation of data-change-event or YANG notification streams.
46 final class CreateStreamUtil {
47 private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
49 private CreateStreamUtil() {
50 throw new UnsupportedOperationException("Utility class");
54 * Create data-change-event or notification stream with POST operation via RPC.
56 * @param payload Input of RPC - example in JSON (data-change-event stream):
61 * "path": "/toaster:toaster/toaster:toasterStatus",
62 * "sal-remote-augment:datastore": "OPERATIONAL",
63 * "sal-remote-augment:scope": "ONE"
68 * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
69 * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
74 * "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
80 static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodeContext payload,
81 final EffectiveModelContext refSchemaCtx) {
82 // parsing out of container with settings and path
83 final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
84 final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
85 final YangInstanceIdentifier path = preparePath(data, qname);
87 // building of stream name
88 final StringBuilder streamNameBuilder = new StringBuilder(
89 prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx), data));
90 final NotificationOutputType outputType = prepareOutputType(data);
91 if (outputType.equals(NotificationOutputType.JSON)) {
92 streamNameBuilder.append('/').append(outputType.getName());
94 final String streamName = streamNameBuilder.toString();
96 // registration of the listener
97 ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
100 final QName outputQname = QName.create(qname, "output");
101 final QName streamNameQname = QName.create(qname, "stream-name");
103 final ContainerNode output = ImmutableContainerNodeBuilder.create()
104 .withNodeIdentifier(new NodeIdentifier(outputQname))
105 .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
106 return new DefaultDOMRpcResult(output);
110 * Prepare {@link NotificationOutputType}.
112 * @param data Container with stream settings (RPC create-stream).
113 * @return Parsed {@link NotificationOutputType}.
115 private static NotificationOutputType prepareOutputType(final ContainerNode data) {
116 NotificationOutputType outputType = parseEnum(
117 data, NotificationOutputType.class, RestconfStreamsConstants.OUTPUT_TYPE_PARAM_NAME);
118 return outputType == null ? NotificationOutputType.XML : outputType;
122 * Prepare stream name.
124 * @param path Path of element from which data-change-event notifications are going to be generated.
125 * @param schemaContext Schema context.
126 * @param data Container with stream settings (RPC create-stream).
127 * @return Parsed stream name.
129 private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
130 final EffectiveModelContext schemaContext, final ContainerNode data) {
131 LogicalDatastoreType datastoreType = parseEnum(
132 data, LogicalDatastoreType.class, RestconfStreamsConstants.DATASTORE_PARAM_NAME);
133 datastoreType = datastoreType == null ? LogicalDatastoreType.CONFIGURATION : datastoreType;
135 DataChangeScope scope = parseEnum(data, DataChangeScope.class, RestconfStreamsConstants.SCOPE_PARAM_NAME);
136 scope = scope == null ? DataChangeScope.BASE : scope;
138 return RestconfStreamsConstants.DATA_SUBSCRIPTION
140 + ListenersBroker.createStreamNameFromUri(
141 ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
142 + RestconfStreamsConstants.DS_URI
144 + RestconfStreamsConstants.SCOPE_URI
149 * Prepare {@link YangInstanceIdentifier} of stream source.
151 * @param data Container with stream settings (RPC create-stream).
152 * @param qualifiedName QName of the input RPC context (used only in debugging).
153 * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
154 * are going to be generated.
156 private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
157 final Optional<DataContainerChild> path = data.findChildByArg(
158 new NodeIdentifier(QName.create(qualifiedName, RestconfStreamsConstants.STREAM_PATH_PARAM_NAME)));
159 Object pathValue = null;
160 if (path.isPresent()) {
161 pathValue = path.get().body();
163 if (!(pathValue instanceof YangInstanceIdentifier)) {
164 LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
165 throw new RestconfDocumentedException(
166 "Instance identifier was not normalized correctly",
167 ErrorType.APPLICATION,
168 ErrorTag.OPERATION_FAILED);
170 return (YangInstanceIdentifier) pathValue;
174 * Parsing out of enumeration from RPC create-stream body.
176 * @param data Container with stream settings (RPC create-stream).
177 * @param clazz Enum type to be parsed out from input container.
178 * @param paramName Local name of the enum element.
179 * @return Parsed enumeration.
181 private static <T> T parseEnum(final ContainerNode data, final Class<T> clazz, final String paramName) {
182 final Optional<DataContainerChild> optAugNode = data.findChildByArg(
183 RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER);
184 if (optAugNode.isEmpty()) {
187 final DataContainerChild augNode = optAugNode.get();
188 if (!(augNode instanceof AugmentationNode)) {
191 final Optional<DataContainerChild> enumNode = ((AugmentationNode) augNode).findChildByArg(
192 new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName)));
193 if (enumNode.isEmpty()) {
196 final Object value = enumNode.get().body();
197 if (!(value instanceof String)) {
201 return ResolveEnumUtil.resolveEnum(clazz, (String) value);
205 * Create YANG notification stream using notification definition in YANG schema.
207 * @param notificationDefinition YANG notification definition.
208 * @param refSchemaCtx Reference to {@link EffectiveModelContext}
209 * @param outputType Output type (XML or JSON).
210 * @return {@link NotificationListenerAdapter}
212 static NotificationListenerAdapter createYangNotifiStream(final NotificationDefinition notificationDefinition,
213 final EffectiveModelContext refSchemaCtx, final NotificationOutputType outputType) {
214 final String streamName = parseNotificationStreamName(requireNonNull(notificationDefinition),
215 requireNonNull(refSchemaCtx), requireNonNull(outputType.getName()));
216 final Optional<NotificationListenerAdapter> listenerForStreamName = ListenersBroker.getInstance()
217 .getNotificationListenerFor(streamName);
218 return listenerForStreamName.orElseGet(() -> ListenersBroker.getInstance().registerNotificationListener(
219 Absolute.of(ImmutableList.copyOf(notificationDefinition.getPath().getPathFromRoot())), streamName,
223 private static String parseNotificationStreamName(final NotificationDefinition notificationDefinition,
224 final EffectiveModelContext refSchemaCtx, final String outputType) {
225 final QName notificationDefinitionQName = notificationDefinition.getQName();
226 final Module module = refSchemaCtx.findModule(
227 notificationDefinitionQName.getModule().getNamespace(),
228 notificationDefinitionQName.getModule().getRevision()).orElse(null);
229 requireNonNull(module, String.format("Module for namespace %s does not exist.",
230 notificationDefinitionQName.getModule().getNamespace()));
232 final StringBuilder streamNameBuilder = new StringBuilder();
233 streamNameBuilder.append(RestconfStreamsConstants.NOTIFICATION_STREAM)
235 .append(module.getName())
237 .append(notificationDefinitionQName.getLocalName());
238 if (outputType.equals(NotificationOutputType.JSON.getName())) {
239 streamNameBuilder.append('/').append(NotificationOutputType.JSON.getName());
241 return streamNameBuilder.toString();