Netconf Device Notification
[netconf.git] / restconf / restconf-nb / src / main / java / org / opendaylight / restconf / nb / rfc8040 / rests / services / impl / CreateStreamUtil.java
index 11bc447fada389b566520554ebed7ca8a72bf777..17c22a45f9468157da050b6bcfb29a0ae2402fad 100644 (file)
@@ -10,17 +10,27 @@ package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMMountPoint;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.api.DOMSchemaService;
 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
 import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
@@ -50,16 +60,23 @@ import org.slf4j.LoggerFactory;
 final class CreateStreamUtil {
     private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
     private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
+
+    private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
     private static final QName DATASTORE_QNAME =
         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
     private static final QName SCOPE_QNAME =
         QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
     private static final QName OUTPUT_TYPE_QNAME =
         QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
+    private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
+        QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
+    private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
+        QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
     private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
     private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
     private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
-
+    private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
+        NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
     private static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(
         ImmutableSet.of(SCOPE_QNAME, DATASTORE_QNAME, OUTPUT_TYPE_QNAME));
 
@@ -123,6 +140,64 @@ final class CreateStreamUtil {
             .build());
     }
 
+    /**
+     * Create device notification stream.
+     *
+     * @param baseUrl base Url
+     * @param payload data
+     * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
+     * @param streamUtil stream utility
+     * @param mountPointService dom mount point service
+     * @return {@link DOMRpcResult} - Output of RPC - example in JSON
+     */
+    static DOMRpcResult createDeviceNotificationListener(final String baseUrl, final NormalizedNodePayload payload,
+            final EffectiveModelContext refSchemaCtx, final SubscribeToStreamUtil streamUtil,
+            final DOMMountPointService mountPointService) {
+        // parsing out of container with settings and path
+        final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
+        final YangInstanceIdentifier value =
+            (YangInstanceIdentifier) data.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
+            .map(DataContainerChild::body)
+            .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
+                ErrorTag.OPERATION_FAILED));
+        final DOMMountPoint mountPoint = mountPointService.getMountPoint(value)
+            .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
+                ErrorTag.OPERATION_FAILED));
+
+        final String deviceName = extractDeviceName(value);
+        final NotificationOutputType outputType = prepareOutputType(data);
+        EffectiveModelContext effectiveModelContext = mountPoint.getService(DOMSchemaService.class).get()
+            .getGlobalContext();
+        Collection<? extends NotificationDefinition> notificationDefinitions = mountPoint.getService(
+                DOMSchemaService.class).get().getGlobalContext()
+            .getNotifications();
+        if (notificationDefinitions == null || notificationDefinitions.isEmpty()) {
+            throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
+                ErrorTag.OPERATION_FAILED);
+        }
+
+        Set<Absolute> absolutes = notificationDefinitions.stream()
+            .map(notificationDefinition -> Absolute.of(notificationDefinition.getQName()))
+            .collect(Collectors.toUnmodifiableSet());
+
+        final DeviceNotificationListenerAdaptor notificationListenerAdapter = ListenersBroker.getInstance()
+            .registerDeviceNotificationListener(deviceName, outputType, effectiveModelContext, mountPointService,
+                mountPoint.getIdentifier());
+        notificationListenerAdapter.listen(mountPoint.getService(DOMNotificationService.class).get(), absolutes);
+
+        // building of output
+        return new DefaultDOMRpcResult(Builders.containerBuilder()
+            .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
+            .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName
+                + "?" + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
+            .build());
+    }
+
+    private static String extractDeviceName(final YangInstanceIdentifier iid) {
+        return ((YangInstanceIdentifier.NodeIdentifierWithPredicates.Singleton)iid.getLastPathArgument())
+            .values().getElement().toString();
+    }
+
     /**
      * Prepare {@link NotificationOutputType}.
      *