Remove SchemaContextRef
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / utils / CreateStreamUtil.java
index 674d189ed53551f88f923b4e2a9c750103645cff..4012c8c62f6c3539f280473dd3d086742f35fcb3 100644 (file)
@@ -7,22 +7,19 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.rests.utils;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import java.util.ArrayList;
-import java.util.List;
+import static java.util.Objects.requireNonNull;
+
 import java.util.Optional;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
 import org.opendaylight.restconf.common.context.NormalizedNodeContext;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
 import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
-import org.opendaylight.restconf.nb.rfc8040.references.SchemaContextRef;
+import org.opendaylight.restconf.common.util.DataChangeScope;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -34,123 +31,157 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Util class for streams.
- *
- * <ul>
- * <li>create stream
- * <li>subscribe
- * </ul>
- *
+ * Utility class for creation of data-change-event or YANG notification streams.
  */
 public final class CreateStreamUtil {
 
     private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
-    private static final String OUTPUT_TYPE_PARAM_NAME = "notification-output-type";
 
     private CreateStreamUtil() {
-        throw new UnsupportedOperationException("Util class");
+        throw new UnsupportedOperationException("Utility class");
     }
 
     /**
-     * Create stream with POST operation via RPC.
-     *
-     * @param payload
-     *             input of rpc - example in JSON:
-     *
-     *            <pre>
-     *            {@code
-     *            {
-     *                "input": {
-     *                    "path": "/toaster:toaster/toaster:toasterStatus",
-     *                    "sal-remote-augment:datastore": "OPERATIONAL",
-     *                    "sal-remote-augment:scope": "ONE"
-     *                }
-     *            }
-     *            }
-     *            </pre>
-     *
-     * @param refSchemaCtx
-     *             reference to {@link SchemaContext} -
-     *            {@link SchemaContextRef}
-     * @return {@link CheckedFuture} with {@link DOMRpcResult} - This mean
-     *         output of RPC - example in JSON:
+     * Create data-change-event or notification stream with POST operation via RPC.
      *
-     *         <pre>
-     *         {@code
+     * @param payload      Input of RPC - example in JSON (data-change-event stream):
+     *                     <pre>
+     *                     {@code
+     *                         {
+     *                             "input": {
+     *                                 "path": "/toaster:toaster/toaster:toasterStatus",
+     *                                 "sal-remote-augment:datastore": "OPERATIONAL",
+     *                                 "sal-remote-augment:scope": "ONE"
+     *                             }
+     *                         }
+     *                     }
+     *                     </pre>
+     * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
+     * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
+     *     <pre>
+     *     {@code
      *         {
      *             "output": {
      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
      *             }
      *         }
-     *         }
-     *         </pre>
-     *
+     *     }
+     *     </pre>
      */
     public static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodeContext payload,
-            final SchemaContextRef refSchemaCtx) {
-        final ContainerNode data = (ContainerNode) payload.getData();
+            final EffectiveModelContext refSchemaCtx) {
+        // parsing out of container with settings and path
+        final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
         final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
         final YangInstanceIdentifier path = preparePath(data, qname);
-        String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data);
-
-        final QName outputQname = QName.create(qname, "output");
-        final QName streamNameQname = QName.create(qname, "stream-name");
 
+        // building of stream name
+        final StringBuilder streamNameBuilder = new StringBuilder(
+                prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx), data));
         final NotificationOutputType outputType = prepareOutputType(data);
         if (outputType.equals(NotificationOutputType.JSON)) {
-            streamName = streamName + "/JSON";
+            streamNameBuilder.append('/').append(outputType.getName());
         }
+        final String streamName = streamNameBuilder.toString();
 
-        if (!Notificator.existListenerFor(streamName)) {
-            Notificator.createListener(path, streamName, outputType);
-        }
+        // registration of the listener
+        ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
+
+        // building of output
+        final QName outputQname = QName.create(qname, RestconfStreamsConstants.OUTPUT_CONTAINER_NAME);
+        final QName streamNameQname = QName.create(qname, RestconfStreamsConstants.OUTPUT_STREAM_NAME);
 
-        final ContainerNode output =
-                ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname))
-                        .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+        final ContainerNode output = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(outputQname))
+                .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
         return new DefaultDOMRpcResult(output);
     }
 
     /**
-     * Prepare {@code NotificationOutputType}.
+     * Prepare {@link NotificationOutputType}.
      *
-     * @param data
-     *             data of notification
-     * @return output type fo notification
+     * @param data Container with stream settings (RPC create-stream).
+     * @return Parsed {@link NotificationOutputType}.
      */
     private static NotificationOutputType prepareOutputType(final ContainerNode data) {
-        NotificationOutputType outputType = parseEnum(data, NotificationOutputType.class, OUTPUT_TYPE_PARAM_NAME);
+        NotificationOutputType outputType = parseEnum(
+                data, NotificationOutputType.class, RestconfStreamsConstants.OUTPUT_TYPE_PARAM_NAME);
         return outputType == null ? NotificationOutputType.XML : outputType;
     }
 
+    /**
+     * Prepare stream name.
+     *
+     * @param path          Path of element from which data-change-event notifications are going to be generated.
+     * @param schemaContext Schema context.
+     * @param data          Container with stream settings (RPC create-stream).
+     * @return Parsed stream name.
+     */
     private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
-                                                            final SchemaContext schemaContext,
-            final ContainerNode data) {
-        LogicalDatastoreType ds = parseEnum(data, LogicalDatastoreType.class,
-                RestconfStreamsConstants.DATASTORE_PARAM_NAME);
-        ds = ds == null ? RestconfStreamsConstants.DEFAULT_DS : ds;
+            final SchemaContext schemaContext, final ContainerNode data) {
+        LogicalDatastoreType datastoreType = parseEnum(
+                data, LogicalDatastoreType.class, RestconfStreamsConstants.DATASTORE_PARAM_NAME);
+        datastoreType = datastoreType == null ? RestconfStreamsConstants.DEFAULT_DS : datastoreType;
 
         DataChangeScope scope = parseEnum(data, DataChangeScope.class, RestconfStreamsConstants.SCOPE_PARAM_NAME);
         scope = scope == null ? RestconfStreamsConstants.DEFAULT_SCOPE : scope;
 
-        final String streamName = RestconfStreamsConstants.DATA_SUBSCR + "/"
-                + Notificator
-                .createStreamNameFromUri(ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
-                + RestconfStreamsConstants.DS_URI + ds + RestconfStreamsConstants.SCOPE_URI + scope);
-        return streamName;
+        return RestconfStreamsConstants.DATA_SUBSCRIPTION
+                + "/"
+                + ListenersBroker.createStreamNameFromUri(
+                ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
+                        + RestconfStreamsConstants.DS_URI
+                        + datastoreType
+                        + RestconfStreamsConstants.SCOPE_URI
+                        + scope);
+    }
+
+    /**
+     * Prepare {@link YangInstanceIdentifier} of stream source.
+     *
+     * @param data          Container with stream settings (RPC create-stream).
+     * @param qualifiedName QName of the input RPC context (used only in debugging).
+     * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
+     *     are going to be generated.
+     */
+    private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
+        final Optional<DataContainerChild<? extends PathArgument, ?>> path = data.getChild(
+                new YangInstanceIdentifier.NodeIdentifier(QName.create(
+                        qualifiedName,
+                        RestconfStreamsConstants.STREAM_PATH_PARAM_NAME)));
+        Object pathValue = null;
+        if (path.isPresent()) {
+            pathValue = path.get().getValue();
+        }
+        if (!(pathValue instanceof YangInstanceIdentifier)) {
+            LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
+            throw new RestconfDocumentedException(
+                    "Instance identifier was not normalized correctly",
+                    ErrorType.APPLICATION,
+                    ErrorTag.OPERATION_FAILED);
+        }
+        return (YangInstanceIdentifier) pathValue;
     }
 
+    /**
+     * Parsing out of enumeration from RPC create-stream body.
+     *
+     * @param data      Container with stream settings (RPC create-stream).
+     * @param clazz     Enum type to be parsed out from input container.
+     * @param paramName Local name of the enum element.
+     * @return Parsed enumeration.
+     */
     private static <T> T parseEnum(final ContainerNode data, final Class<T> clazz, final String paramName) {
         final Optional<DataContainerChild<? extends PathArgument, ?>> optAugNode = data.getChild(
-            RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER);
+                RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER);
         if (!optAugNode.isPresent()) {
             return null;
         }
@@ -159,7 +190,7 @@ public final class CreateStreamUtil {
             return null;
         }
         final Optional<DataContainerChild<? extends PathArgument, ?>> enumNode = ((AugmentationNode) augNode).getChild(
-            new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName)));
+                new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName)));
         if (!enumNode.isPresent()) {
             return null;
         }
@@ -171,65 +202,43 @@ public final class CreateStreamUtil {
         return ResolveEnumUtil.resolveEnum(clazz, (String) value);
     }
 
-    private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
-        final Optional<DataContainerChild<? extends PathArgument, ?>> path = data
-                .getChild(new YangInstanceIdentifier.NodeIdentifier(QName.create(qualifiedName, "path")));
-        Object pathValue = null;
-        if (path.isPresent()) {
-            pathValue = path.get().getValue();
-        }
-        if (!(pathValue instanceof YangInstanceIdentifier)) {
-            final String errMsg = "Instance identifier was not normalized correctly ";
-            LOG.debug(errMsg + qualifiedName);
-            throw new RestconfDocumentedException(errMsg, ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
-        }
-        return (YangInstanceIdentifier) pathValue;
-    }
-
     /**
-     * Create stream with POST operation via RPC.
+     * Create YANG notification stream using notification definition in YANG schema.
      *
-     * @param notificatinoDefinition
-     *             input of RPC
-     * @param refSchemaCtx
-     *             schemaContext
-     * @param outputType
-     *              output type
-     * @return {@link DOMRpcResult}
+     * @param notificationDefinition YANG notification definition.
+     * @param refSchemaCtx           Reference to {@link EffectiveModelContext}
+     * @param outputType             Output type (XML or JSON).
+     * @return {@link NotificationListenerAdapter}
      */
-    public static List<NotificationListenerAdapter> createYangNotifiStream(
-            final NotificationDefinition notificatinoDefinition, final SchemaContextRef refSchemaCtx,
-            final String outputType) {
-        final List<SchemaPath> paths = new ArrayList<>();
-        final QName notificatinoDefinitionQName = notificatinoDefinition.getQName();
-        final Module module =
-                refSchemaCtx.findModuleByNamespaceAndRevision(notificatinoDefinitionQName.getModule().getNamespace(),
-                        notificatinoDefinitionQName.getModule().getRevision());
-        Preconditions.checkNotNull(module,
-                "Module for namespace " + notificatinoDefinitionQName.getModule().getNamespace() + " does not exist");
-        NotificationDefinition notifiDef = null;
-        for (final NotificationDefinition notification : module.getNotifications()) {
-            if (notification.getQName().equals(notificatinoDefinitionQName)) {
-                notifiDef = notification;
-                break;
-            }
-        }
-        final String moduleName = module.getName();
-        Preconditions.checkNotNull(notifiDef,
-                "Notification " + notificatinoDefinitionQName + "doesn't exist in module " + moduleName);
-        paths.add(notifiDef.getPath());
-        String streamName = RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM + "/";
-        streamName = streamName + moduleName + ":" + notificatinoDefinitionQName.getLocalName();
-        if (outputType.equals("JSON")) {
-            streamName = streamName + "/JSON";
-        }
+    public static NotificationListenerAdapter createYangNotifiStream(
+            final NotificationDefinition notificationDefinition, final EffectiveModelContext refSchemaCtx,
+            final NotificationOutputType outputType) {
+        final String streamName = parseNotificationStreamName(requireNonNull(notificationDefinition),
+                requireNonNull(refSchemaCtx), requireNonNull(outputType.getName()));
+        final Optional<NotificationListenerAdapter> listenerForStreamName = ListenersBroker.getInstance()
+                .getNotificationListenerFor(streamName);
+        return listenerForStreamName.orElseGet(() -> ListenersBroker.getInstance().registerNotificationListener(
+                notificationDefinition.getPath(), streamName, outputType));
+    }
 
-        if (Notificator.existNotificationListenerFor(streamName)) {
-            final List<NotificationListenerAdapter> notificationListenerFor =
-                    Notificator.getNotificationListenerFor(streamName);
-            return SubscribeToStreamUtil.pickSpecificListenerByOutput(notificationListenerFor, outputType);
+    private static String parseNotificationStreamName(final NotificationDefinition notificationDefinition,
+            final EffectiveModelContext refSchemaCtx, final String outputType) {
+        final QName notificationDefinitionQName = notificationDefinition.getQName();
+        final Module module = refSchemaCtx.findModule(
+                notificationDefinitionQName.getModule().getNamespace(),
+                notificationDefinitionQName.getModule().getRevision()).orElse(null);
+        requireNonNull(module, String.format("Module for namespace %s does not exist.",
+                notificationDefinitionQName.getModule().getNamespace()));
+
+        final StringBuilder streamNameBuilder = new StringBuilder();
+        streamNameBuilder.append(RestconfStreamsConstants.NOTIFICATION_STREAM)
+                .append('/')
+                .append(module.getName())
+                .append(':')
+                .append(notificationDefinitionQName.getLocalName());
+        if (outputType.equals(NotificationOutputType.JSON.getName())) {
+            streamNameBuilder.append('/').append(NotificationOutputType.JSON.getName());
         }
-
-        return Notificator.createNotificationListener(paths, streamName, outputType);
+        return streamNameBuilder.toString();
     }
-}
+}
\ No newline at end of file