2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.rests.utils;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import java.util.ArrayList;
14 import java.util.List;
15 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
18 import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
19 import org.opendaylight.restconf.common.context.NormalizedNodeContext;
20 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
21 import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
22 import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
23 import org.opendaylight.restconf.nb.rfc8040.references.SchemaContextRef;
24 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
25 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
26 import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
27 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
28 import org.opendaylight.yangtools.yang.common.QName;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
32 import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
33 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
34 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
35 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
36 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
37 import org.opendaylight.yangtools.yang.model.api.Module;
38 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
45 * Util class for streams.
53 public final class CreateStreamUtil {
55 private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
56 private static final String OUTPUT_TYPE_PARAM_NAME = "notification-output-type";
58 private CreateStreamUtil() {
59 throw new UnsupportedOperationException("Util class");
63 * Create stream with POST operation via RPC.
66 * input of rpc - example in JSON:
72 * "path": "/toaster:toaster/toaster:toasterStatus",
73 * "sal-remote-augment:datastore": "OPERATIONAL",
74 * "sal-remote-augment:scope": "ONE"
81 * reference to {@link SchemaContext} -
82 * {@link SchemaContextRef}
83 * @return {@link CheckedFuture} with {@link DOMRpcResult} - This mean
84 * output of RPC - example in JSON:
90 * "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
97 public static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodeContext payload,
98 final SchemaContextRef refSchemaCtx) {
99 final ContainerNode data = (ContainerNode) payload.getData();
100 final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
101 final YangInstanceIdentifier path = preparePath(data, qname);
102 String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data);
104 final QName outputQname = QName.create(qname, "output");
105 final QName streamNameQname = QName.create(qname, "stream-name");
107 final NotificationOutputType outputType = prepareOutputType(data);
108 if (outputType.equals(NotificationOutputType.JSON)) {
109 streamName = streamName + "/JSON";
112 if (!Notificator.existListenerFor(streamName)) {
113 Notificator.createListener(path, streamName, outputType);
116 final ContainerNode output =
117 ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname))
118 .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
119 return new DefaultDOMRpcResult(output);
123 * Prepare {@code NotificationOutputType}.
126 * data of notification
127 * @return output type fo notification
129 private static NotificationOutputType prepareOutputType(final ContainerNode data) {
130 NotificationOutputType outputType = parseEnum(data, NotificationOutputType.class, OUTPUT_TYPE_PARAM_NAME);
131 return outputType = outputType == null ? NotificationOutputType.XML : outputType;
134 private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
135 final SchemaContext schemaContext,
136 final ContainerNode data) {
137 LogicalDatastoreType ds = parseEnum(data, LogicalDatastoreType.class,
138 RestconfStreamsConstants.DATASTORE_PARAM_NAME);
139 ds = ds == null ? RestconfStreamsConstants.DEFAULT_DS : ds;
141 DataChangeScope scope = parseEnum(data, DataChangeScope.class, RestconfStreamsConstants.SCOPE_PARAM_NAME);
142 scope = scope == null ? RestconfStreamsConstants.DEFAULT_SCOPE : scope;
144 final String streamName = RestconfStreamsConstants.DATA_SUBSCR + "/"
146 .createStreamNameFromUri(ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
147 + RestconfStreamsConstants.DS_URI + ds + RestconfStreamsConstants.SCOPE_URI + scope);
151 private static <T> T parseEnum(final ContainerNode data, final Class<T> clazz, final String paramName) {
152 final Optional<DataContainerChild<? extends PathArgument, ?>> optAugNode = data.getChild(
153 RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER);
154 if (!optAugNode.isPresent()) {
157 final DataContainerChild<? extends PathArgument, ?> augNode = optAugNode.get();
158 if (!(augNode instanceof AugmentationNode)) {
161 final Optional<DataContainerChild<? extends PathArgument, ?>> enumNode = ((AugmentationNode) augNode).getChild(
162 new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName)));
163 if (!enumNode.isPresent()) {
166 final Object value = enumNode.get().getValue();
167 if (!(value instanceof String)) {
171 return ResolveEnumUtil.resolveEnum(clazz, (String) value);
174 private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
175 final Optional<DataContainerChild<? extends PathArgument, ?>> path = data
176 .getChild(new YangInstanceIdentifier.NodeIdentifier(QName.create(qualifiedName, "path")));
177 Object pathValue = null;
178 if (path.isPresent()) {
179 pathValue = path.get().getValue();
181 if (!(pathValue instanceof YangInstanceIdentifier)) {
182 final String errMsg = "Instance identifier was not normalized correctly ";
183 LOG.debug(errMsg + qualifiedName);
184 throw new RestconfDocumentedException(errMsg, ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
186 return (YangInstanceIdentifier) pathValue;
190 * Create stream with POST operation via RPC.
192 * @param notificatinoDefinition
194 * @param refSchemaCtx
198 * @return {@link DOMRpcResult}
200 public static List<NotificationListenerAdapter> createYangNotifiStream(
201 final NotificationDefinition notificatinoDefinition, final SchemaContextRef refSchemaCtx,
202 final String outputType) {
203 final List<SchemaPath> paths = new ArrayList<>();
204 final QName notificatinoDefinitionQName = notificatinoDefinition.getQName();
205 final Module module =
206 refSchemaCtx.findModuleByNamespaceAndRevision(notificatinoDefinitionQName.getModule().getNamespace(),
207 notificatinoDefinitionQName.getModule().getRevision());
208 Preconditions.checkNotNull(module,
209 "Module for namespace " + notificatinoDefinitionQName.getModule().getNamespace() + " does not exist");
210 NotificationDefinition notifiDef = null;
211 for (final NotificationDefinition notification : module.getNotifications()) {
212 if (notification.getQName().equals(notificatinoDefinitionQName)) {
213 notifiDef = notification;
217 final String moduleName = module.getName();
218 Preconditions.checkNotNull(notifiDef,
219 "Notification " + notificatinoDefinitionQName + "doesn't exist in module " + moduleName);
220 paths.add(notifiDef.getPath());
221 String streamName = RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM + "/";
222 streamName = streamName + moduleName + ":" + notificatinoDefinitionQName.getLocalName();
223 if (outputType.equals("JSON")) {
224 streamName = streamName + "/JSON";
227 if (Notificator.existNotificationListenerFor(streamName)) {
228 final List<NotificationListenerAdapter> notificationListenerFor =
229 Notificator.getNotificationListenerFor(streamName);
230 return SubscribeToStreamUtil.pickSpecificListenerByOutput(notificationListenerFor, outputType);
233 return Notificator.createNotificationListener(paths, streamName, outputType);