Move SubscribeToStreamUtil to rests.services.impl
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / RestconfDataServiceImpl.java
index 48019124389d7e8077c4b47ca2c97466a1e0e167..819cde9f6274c3d425eddb35cd05b074ddd61dad 100644 (file)
@@ -11,10 +11,13 @@ import static java.util.Objects.requireNonNull;
 import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfDataServiceConstant.PostPutQueryParameters.INSERT;
 import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfDataServiceConstant.PostPutQueryParameters.POINT;
 import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.NOTIFICATION_STREAM;
+import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAMS_PATH;
 import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAM_ACCESS_PATH_PART;
 import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAM_LOCATION_PATH_PART;
 import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAM_PATH;
+import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAM_PATH_PART;
 
+import java.net.URI;
 import java.time.Clock;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
@@ -29,6 +32,7 @@ import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.mdsal.dom.api.DOMActionResult;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
 import org.opendaylight.restconf.common.context.InstanceIdentifierContext;
 import org.opendaylight.restconf.common.context.NormalizedNodeContext;
@@ -46,6 +50,7 @@ import org.opendaylight.restconf.nb.rfc8040.handlers.TransactionChainHandler;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfDataService;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
 import org.opendaylight.restconf.nb.rfc8040.rests.transactions.TransactionVarsWrapper;
+import org.opendaylight.restconf.nb.rfc8040.rests.utils.CreateStreamUtil;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.DeleteDataTransactionUtil;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.PatchDataTransactionUtil;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.PlainPatchDataTransactionUtil;
@@ -54,8 +59,11 @@ import org.opendaylight.restconf.nb.rfc8040.rests.utils.PutDataTransactionUtil;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.ReadDataTransactionUtil;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfDataServiceConstant;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfInvokeOperationsUtil;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
+import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.concepts.Immutable;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.Revision;
@@ -64,6 +72,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.ActionDefinition;
 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
@@ -137,7 +146,7 @@ public class RestconfDataServiceImpl implements RestconfDataService {
         final DOMMountPoint mountPoint = instanceIdentifier.getMountPoint();
         final TransactionVarsWrapper transactionNode = new TransactionVarsWrapper(
                 instanceIdentifier, mountPoint, getTransactionChainHandler(mountPoint));
-        final NormalizedNode<?, ?> node = ReadDataTransactionUtil.readData(identifier, parameters.getContent(),
+        final NormalizedNode<?, ?> node = readData(identifier, parameters.getContent(),
                 transactionNode, parameters.getWithDefault(), schemaContextRef, uriInfo);
         if (identifier != null && identifier.contains(STREAM_PATH) && identifier.contains(STREAM_ACCESS_PATH_PART)
                 && identifier.contains(STREAM_LOCATION_PATH_PART)) {
@@ -167,6 +176,64 @@ public class RestconfDataServiceImpl implements RestconfDataService {
         return Response.status(200).entity(new NormalizedNodeContext(instanceIdentifier, node, parameters)).build();
     }
 
+
+    /**
+     * Read specific type of data from data store via transaction and if identifier read data from
+     * streams then put streams from actual schema context to datastore.
+     *
+     * @param identifier
+     *             identifier of data to read
+     * @param content
+     *             type of data to read (config, state, all)
+     * @param transactionNode
+     *             {@link TransactionVarsWrapper} - wrapper for variables
+     * @param withDefa
+     *             vaule of with-defaults parameter
+     * @param schemaContext
+     *             schema context
+     * @param uriInfo
+     *             uri info
+     * @return {@link NormalizedNode}
+     */
+    private static NormalizedNode<?, ?> readData(final String identifier, final String content,
+                                                final TransactionVarsWrapper transactionNode, final String withDefa,
+                                                final EffectiveModelContext schemaContext, final UriInfo uriInfo) {
+        if (identifier != null && identifier.contains(STREAMS_PATH) && !identifier.contains(STREAM_PATH_PART)) {
+            createAllYangNotificationStreams(transactionNode, schemaContext, uriInfo);
+        }
+        return ReadDataTransactionUtil.readData(content, transactionNode, withDefa, schemaContext);
+    }
+
+    private static void createAllYangNotificationStreams(final TransactionVarsWrapper transactionNode,
+            final EffectiveModelContext schemaContext, final UriInfo uriInfo) {
+        final DOMDataTreeReadWriteTransaction wTx = transactionNode.getTransactionChain().newReadWriteTransaction();
+        final boolean exist = SubscribeToStreamUtil.checkExist(schemaContext, wTx);
+
+        for (final NotificationDefinition notificationDefinition : schemaContext.getNotifications()) {
+            final NotificationListenerAdapter notifiStreamXML =
+                    CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContext,
+                            NotificationOutputType.XML);
+            final NotificationListenerAdapter notifiStreamJSON =
+                    CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContext,
+                            NotificationOutputType.JSON);
+            writeNotificationStreamToDatastore(schemaContext, uriInfo, wTx, exist, notifiStreamXML);
+            writeNotificationStreamToDatastore(schemaContext, uriInfo, wTx, exist, notifiStreamJSON);
+        }
+        SubscribeToStreamUtil.submitData(wTx);
+    }
+
+    private static void writeNotificationStreamToDatastore(final EffectiveModelContext schemaContext,
+            final UriInfo uriInfo, final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist,
+            final NotificationListenerAdapter listener) {
+        final URI uri = SubscribeToStreamUtil.prepareUriByStreamName(uriInfo, listener.getStreamName());
+        final NormalizedNode<?, ?> mapToStreams =
+                RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
+                    listener.getSchemaPath().getLastComponent(), schemaContext.getNotifications(), null,
+                    listener.getOutputType(), uri, SubscribeToStreamUtil.getMonitoringModule(schemaContext), exist);
+        SubscribeToStreamUtil.writeDataToDS(schemaContext,
+                listener.getSchemaPath().getLastComponent().getLocalName(), readWriteTransaction, exist, mapToStreams);
+    }
+
     @Override
     public Response putData(final String identifier, final NormalizedNodeContext payload, final UriInfo uriInfo) {
         requireNonNull(payload);