final RpcServiceHandler rpcServiceHandler, final NotificationServiceHandler notificationServiceHandler) {
this.delegRestOpsService = new RestconfOperationsServiceImpl(schemaCtxHandler, domMountPointServiceHandler);
this.delegRestSchService = new RestconfSchemaServiceImpl(schemaCtxHandler, domMountPointServiceHandler);
+ this.delegRestconfSubscrService = new RestconfStreamsSubscriptionServiceImpl(domDataBrokerHandler,
+ notificationServiceHandler, schemaCtxHandler, transactionChainHandler);
this.delegRestconfDataService =
- new RestconfDataServiceImpl(schemaCtxHandler, transactionChainHandler, domMountPointServiceHandler);
+ new RestconfDataServiceImpl(schemaCtxHandler, transactionChainHandler, domMountPointServiceHandler,
+ this.delegRestconfSubscrService);
this.delegRestconfInvokeOpsService =
new RestconfInvokeOperationsServiceImpl(rpcServiceHandler, schemaCtxHandler);
- this.delegRestconfSubscrService = new RestconfStreamsSubscriptionServiceImpl(domDataBrokerHandler,
- notificationServiceHandler, schemaCtxHandler, transactionChainHandler);
this.delegRestService = new RestconfImpl(schemaCtxHandler);
}
}
*/
package org.opendaylight.restconf.restful.services.impl;
+import static org.opendaylight.restconf.restful.utils.RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM;
+import static org.opendaylight.restconf.restful.utils.RestconfStreamsConstants.STREAM_ACCESS_PATH_PART;
+import static org.opendaylight.restconf.restful.utils.RestconfStreamsConstants.STREAM_LOCATION_PATH_PART;
+import static org.opendaylight.restconf.restful.utils.RestconfStreamsConstants.STREAM_PATH;
+
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.text.SimpleDateFormat;
import org.opendaylight.restconf.handlers.SchemaContextHandler;
import org.opendaylight.restconf.handlers.TransactionChainHandler;
import org.opendaylight.restconf.restful.services.api.RestconfDataService;
+import org.opendaylight.restconf.restful.services.api.RestconfStreamsSubscriptionService;
import org.opendaylight.restconf.restful.transaction.TransactionVarsWrapper;
import org.opendaylight.restconf.restful.utils.DeleteDataTransactionUtil;
import org.opendaylight.restconf.restful.utils.PatchDataTransactionUtil;
import org.opendaylight.restconf.restful.utils.PutDataTransactionUtil;
import org.opendaylight.restconf.restful.utils.ReadDataTransactionUtil;
import org.opendaylight.restconf.restful.utils.RestconfDataServiceConstant;
+import org.opendaylight.restconf.utils.RestconfConstants;
import org.opendaylight.restconf.utils.parser.ParserIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaNode;
private final TransactionChainHandler transactionChainHandler;
private final DOMMountPointServiceHandler mountPointServiceHandler;
+ private final RestconfStreamsSubscriptionService delegRestconfSubscrService;
+
public RestconfDataServiceImpl(final SchemaContextHandler schemaContextHandler,
final TransactionChainHandler transactionChainHandler,
- final DOMMountPointServiceHandler mountPointServiceHandler) {
+ final DOMMountPointServiceHandler mountPointServiceHandler,
+ final RestconfStreamsSubscriptionService delegRestconfSubscrService) {
this.schemaContextHandler = schemaContextHandler;
this.transactionChainHandler = transactionChainHandler;
this.mountPointServiceHandler = mountPointServiceHandler;
+ this.delegRestconfSubscrService = delegRestconfSubscrService;
}
@Override
final TransactionVarsWrapper transactionNode = new TransactionVarsWrapper(
instanceIdentifier, mountPoint, transactionChain);
final NormalizedNode<?, ?> node =
- ReadDataTransactionUtil.readData(parameters.getContent(), transactionNode, withDefa);
+ ReadDataTransactionUtil.readData(identifier, parameters.getContent(), transactionNode, withDefa,
+ schemaContextRef, uriInfo);
+ if (identifier.contains(STREAM_PATH) && identifier.contains(STREAM_ACCESS_PATH_PART)
+ && identifier.contains(STREAM_LOCATION_PATH_PART)) {
+ final String value = (String) node.getValue();
+ final String streamName = value.substring(
+ value.indexOf(CREATE_NOTIFICATION_STREAM.toString() + RestconfConstants.SLASH),
+ value.length());
+ this.delegRestconfSubscrService.subscribeToStream(streamName, uriInfo);
+ }
if (node == null) {
throw new RestconfDocumentedException(
"Request could not be completed because the relevant data model content does not exist",
if (namespace.toString().equals(RestconfStreamsConstants.SAL_REMOTE_NAMESPACE)) {
if (identifier.contains(RestconfStreamsConstants.CREATE_DATA_SUBSCR)) {
response = CreateStreamUtil.createDataChangeNotifiStream(payload, refSchemaCtx);
- } else if (identifier.contains(RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM)) {
- response = CreateStreamUtil.createYangNotifiStream(payload, refSchemaCtx);
} else {
throw new RestconfDocumentedException("Not supported operation", ErrorType.RPC,
ErrorTag.OPERATION_NOT_SUPPORTED);
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.restconf.common.references.SchemaContextRef;
import org.opendaylight.restconf.utils.parser.ParserIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.Module;
/**
* Create stream with POST operation via RPC
*
- * @param payload
+ * @param notificatinoDefinition
* - input of RPC
* @param refSchemaCtx
* - schemaContext
+ * @param outputType
* @return {@link DOMRpcResult}
*/
- public static DOMRpcResult createYangNotifiStream(final NormalizedNodeContext payload,
- final SchemaContextRef refSchemaCtx) {
- final ContainerNode data = (ContainerNode) payload.getData();
- LeafSetNode leafSet = null;
- String outputType = "XML";
- for (final DataContainerChild<? extends PathArgument, ?> dataChild : data.getValue()) {
- if (dataChild instanceof LeafSetNode) {
- leafSet = (LeafSetNode) dataChild;
- } else if (dataChild instanceof AugmentationNode) {
- outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue();
- }
- }
-
- final Collection<LeafSetEntryNode> entryNodes = leafSet.getValue();
+ public static List<NotificationListenerAdapter> createYangNotifiStream(
+ final NotificationDefinition notificatinoDefinition, final SchemaContextRef refSchemaCtx,
+ final String outputType) {
final List<SchemaPath> paths = new ArrayList<>();
String streamName = RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM + "/";
-
- final Iterator<LeafSetEntryNode> iterator = entryNodes.iterator();
- while (iterator.hasNext()) {
- final QName valueQName = QName.create((String) iterator.next().getValue());
- final Module module = refSchemaCtx.findModuleByNamespaceAndRevision(valueQName.getModule().getNamespace(),
- valueQName.getModule().getRevision());
- Preconditions.checkNotNull(module,
- "Module for namespace " + valueQName.getModule().getNamespace() + " does not exist");
- NotificationDefinition notifiDef = null;
- for (final NotificationDefinition notification : module.getNotifications()) {
- if (notification.getQName().equals(valueQName)) {
- notifiDef = notification;
- break;
- }
- }
- final String moduleName = module.getName();
- Preconditions.checkNotNull(notifiDef,
- "Notification " + valueQName + "doesn't exist in module " + moduleName);
- paths.add(notifiDef.getPath());
- streamName = streamName + moduleName + ":" + valueQName.getLocalName();
- if (iterator.hasNext()) {
- streamName = streamName + ",";
+ final QName notificatinoDefinitionQName = notificatinoDefinition.getQName();
+ final Module module =
+ refSchemaCtx.findModuleByNamespaceAndRevision(notificatinoDefinitionQName.getModule().getNamespace(),
+ notificatinoDefinitionQName.getModule().getRevision());
+ Preconditions.checkNotNull(module,
+ "Module for namespace " + notificatinoDefinitionQName.getModule().getNamespace() + " does not exist");
+ NotificationDefinition notifiDef = null;
+ for (final NotificationDefinition notification : module.getNotifications()) {
+ if (notification.getQName().equals(notificatinoDefinitionQName)) {
+ notifiDef = notification;
+ break;
}
}
+ final String moduleName = module.getName();
+ Preconditions.checkNotNull(notifiDef,
+ "Notification " + notificatinoDefinitionQName + "doesn't exist in module " + moduleName);
+ paths.add(notifiDef.getPath());
+ streamName = streamName + moduleName + ":" + notificatinoDefinitionQName.getLocalName();
if (outputType.equals("JSON")) {
streamName = streamName + "/JSON";
}
- final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
- final QName outputQname = QName.create(rpcQName, "output");
- final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
-
- final ContainerNode output =
- ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname))
- .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
if (!Notificator.existNotificationListenerFor(streamName)) {
- Notificator.createNotificationListener(paths, streamName, outputType);
+ return Notificator.createNotificationListener(paths, streamName, outputType);
+ } else {
+ final List<NotificationListenerAdapter> notificationListenerFor =
+ Notificator.getNotificationListenerFor(streamName);
+ return SubscribeToStreamUtil.pickSpecificListenerByOutput(notificationListenerFor, outputType);
}
-
- return new DefaultDOMRpcResult(output);
}
}
*/
package org.opendaylight.restconf.restful.utils;
+import static org.opendaylight.restconf.restful.utils.RestconfStreamsConstants.STREAMS_PATH;
+import static org.opendaylight.restconf.restful.utils.RestconfStreamsConstants.STREAM_PATH_PART;
+
import com.google.common.base.Optional;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.CheckedFuture;
+import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import javax.ws.rs.core.UriInfo;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError;
import org.opendaylight.netconf.sal.restconf.impl.WriterParameters;
import org.opendaylight.netconf.sal.restconf.impl.WriterParameters.WriterParametersBuilder;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
+import org.opendaylight.restconf.common.references.SchemaContextRef;
import org.opendaylight.restconf.restful.transaction.TransactionVarsWrapper;
+import org.opendaylight.restconf.utils.mapping.RestconfMappingNodeUtil;
import org.opendaylight.restconf.utils.parser.ParserFieldsParameter;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.QNameModule;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
configMap.entrySet().stream().filter(x -> stateMap.containsKey(x.getKey())).forEach(
y -> builder.addChild((T) prepareData(y.getValue(), stateMap.get(y.getKey()))));
}
+
+ /**
+ * Read specific type of data from data store via transaction and if identifier read data from
+ * streams then put streams from actual schema context to datastore.
+ *
+ * @param identifier
+ * - identifier of data to read
+ * @param content
+ * - type of data to read (config, state, all)
+ * @param transactionNode
+ * - {@link TransactionVarsWrapper} - wrapper for variables
+ * @param withDefa
+ * - vaule of with-defaults parameter
+ * @param schemaContextRef
+ * - schema context
+ * @param uriInfo
+ * - uri info
+ * @return {@link NormalizedNode}
+ */
+ public static NormalizedNode<?, ?> readData(final String identifier, final String content,
+ final TransactionVarsWrapper transactionNode, final String withDefa,
+ final SchemaContextRef schemaContextRef, final UriInfo uriInfo) {
+ if (identifier.contains(STREAMS_PATH) && !identifier.contains(STREAM_PATH_PART)) {
+ final DOMDataReadWriteTransaction wTx = transactionNode.getTransactionChain().newReadWriteTransaction();
+ final SchemaContext schemaContext = schemaContextRef.get();
+ final boolean exist = SubscribeToStreamUtil.checkExist(schemaContext, wTx);
+
+ for (final NotificationDefinition notificationDefinition : schemaContextRef.get().getNotifications()) {
+ final List<NotificationListenerAdapter> notifiStreamXML =
+ CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef,
+ NotificationOutputType.XML.getName());
+ final List<NotificationListenerAdapter> notifiStreamJSON =
+ CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef,
+ NotificationOutputType.JSON.getName());
+ notifiStreamJSON.addAll(notifiStreamXML);
+
+ for (final NotificationListenerAdapter listener : notifiStreamJSON) {
+ final URI uri = SubscribeToStreamUtil.prepareUriByStreamName(uriInfo, listener.getStreamName());
+ final NormalizedNode mapToStreams =
+ RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
+ listener.getSchemaPath().getLastComponent(), schemaContext.getNotifications(),
+ null, listener.getOutputType(), uri,
+ SubscribeToStreamUtil.getMonitoringModule(schemaContext), exist);
+ SubscribeToStreamUtil.writeDataToDS(schemaContext,
+ listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
+ mapToStreams);
+ }
+ }
+ SubscribeToStreamUtil.submitData(wTx);
+ }
+ return readData(content, transactionNode, withDefa);
+ }
}
public static final CharSequence NOTIFICATION_STREAM = "notification-stream";
public static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
+ public static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams";
+ public static final String STREAM_PATH_PART = "/stream=";
+ public static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART;
+ public static final String STREAM_ACCESS_PATH_PART = "/access=";
+ public static final String STREAM_LOCATION_PATH_PART = "/location";
+
static {
Date eventSubscriptionAugRevision;
try {
import org.opendaylight.restconf.utils.RestconfConstants;
import org.opendaylight.restconf.utils.mapping.RestconfMappingNodeUtil;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
}
- final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+ List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+ if (identifier.contains(RestconfConstants.SLASH + NotificationOutputType.JSON.getName())) {
+ listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.JSON.getName());
+ } else {
+ listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.XML.getName());
+ }
if ((listeners == null) || listeners.isEmpty()) {
throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
ErrorTag.UNKNOWN_ELEMENT);
return uri;
}
+ static List<NotificationListenerAdapter>
+ pickSpecificListenerByOutput(final List<NotificationListenerAdapter> listeners, final String outputType) {
+ for (final NotificationListenerAdapter notificationListenerAdapter : listeners) {
+ if (notificationListenerAdapter.getOutputType().equals(outputType)) {
+ final List<NotificationListenerAdapter> list = new ArrayList<>();
+ list.add(notificationListenerAdapter);
+ return list;
+ }
+ }
+ return listeners;
+ }
+
/**
* Prepare InstanceIdentifierContext for Location leaf
*
}
@SuppressWarnings("rawtypes")
- private static void writeDataToDS(final SchemaContext schemaContext, final String name,
+ static void writeDataToDS(final SchemaContext schemaContext, final String name,
final DOMDataReadWriteTransaction wTx, final boolean exist, final NormalizedNode mapToStreams) {
String pathId = "";
if (exist) {
mapToStreams);
}
- private static void submitData(final DOMDataReadWriteTransaction wTx) {
+ static void submitData(final DOMDataReadWriteTransaction wTx) {
try {
wTx.submit().checkedGet();
} catch (final TransactionCommitFailedException e) {
return result;
}
- private static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+ static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
final int port = SubscribeToStreamUtil.prepareNotificationPort();
final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
return port;
}
- private static boolean checkExist(final SchemaContext schemaContext, final DOMDataReadWriteTransaction wTx) {
+ static boolean checkExist(final SchemaContext schemaContext, final DOMDataReadWriteTransaction wTx) {
boolean exist;
try {
exist = wTx.exists(LogicalDatastoreType.OPERATIONAL,
}
prepareLeafAndFillEntryBuilder(streamEntry,
listSchema.getDataChildByName(MonitoringModule.LEAF_REPLAY_SUPP_STREAM_QNAME), true);
- prepareLeafAndFillEntryBuilder(streamEntry,
+ if (start != null) {
+ prepareLeafAndFillEntryBuilder(streamEntry,
listSchema.getDataChildByName(MonitoringModule.LEAF_START_TIME_STREAM_QNAME),
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'XXX").format(start));
+ }
prepareListAndFillEntryBuilder(streamEntry,
(ListSchemaNode) listSchema.getDataChildByName(MonitoringModule.LIST_ACCESS_STREAM_QNAME),
outputType, uri);
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.restconf.restful.services.impl;
import static org.junit.Assert.assertEquals;
import org.opendaylight.restconf.handlers.DOMMountPointServiceHandler;
import org.opendaylight.restconf.handlers.SchemaContextHandler;
import org.opendaylight.restconf.handlers.TransactionChainHandler;
+import org.opendaylight.restconf.restful.services.api.RestconfStreamsSubscriptionService;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
private DOMDataBroker mountDataBroker;
@Mock
private DOMTransactionChain transactionChain;
+ @Mock
+ private RestconfStreamsSubscriptionService delegRestconfSubscrService;
@Before
public void setUp() throws Exception {
final SchemaContextHandler schemaContextHandler = new SchemaContextHandler(txHandler);
schemaContextHandler.onGlobalContextUpdated(this.contextRef.get());
- this.dataService = new RestconfDataServiceImpl(schemaContextHandler, this.transactionChainHandler, this.mountPointServiceHandler);
+ this.dataService = new RestconfDataServiceImpl(schemaContextHandler, this.transactionChainHandler,
+ this.mountPointServiceHandler, this.delegRestconfSubscrService);
doReturn(this.domTransactionChain).when(this.transactionChainHandler).get();
doReturn(this.read).when(this.domTransactionChain).newReadOnlyTransaction();
doReturn(this.readWrite).when(this.domTransactionChain).newReadWriteTransaction();