import static java.util.Objects.requireNonNull;
import com.google.common.collect.ImmutableSet;
+import java.util.Collection;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMMountPoint;
+import org.opendaylight.mdsal.dom.api.DOMMountPointService;
+import org.opendaylight.mdsal.dom.api.DOMNotificationService;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
+import org.opendaylight.mdsal.dom.api.DOMSchemaService;
import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.DeviceNotificationListenerAdaptor;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.CreateDataChangeEventSubscriptionInput1.Scope;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
final class CreateStreamUtil {
private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
private static final QNameModule SAL_REMOTE_AUGMENT = NotificationOutputTypeGrouping.QNAME.getModule();
+
+ private static final QNameModule DEVICE_NOTIFICATION_MODULE = SubscribeDeviceNotificationInput.QNAME.getModule();
private static final QName DATASTORE_QNAME =
QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.DATASTORE_PARAM_NAME).intern();
private static final QName SCOPE_QNAME =
QName.create(SAL_REMOTE_AUGMENT, RestconfStreamsConstants.SCOPE_PARAM_NAME).intern();
private static final QName OUTPUT_TYPE_QNAME =
QName.create(SAL_REMOTE_AUGMENT, "notification-output-type").intern();
+ private static final QName DEVICE_NOTIFICATION_PATH_QNAME =
+ QName.create(DEVICE_NOTIFICATION_MODULE, "path").intern();
+ private static final QName DEVICE_NOTIFICATION_STREAM_PATH =
+ QName.create(DEVICE_NOTIFICATION_PATH_QNAME, "stream-path").intern();
private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create(DATASTORE_QNAME);
private static final NodeIdentifier SCOPE_NODEID = NodeIdentifier.create(SCOPE_QNAME);
private static final NodeIdentifier OUTPUT_TYPE_NODEID = NodeIdentifier.create(OUTPUT_TYPE_QNAME);
-
+ private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID =
+ NodeIdentifier.create(DEVICE_NOTIFICATION_PATH_QNAME);
private static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(
ImmutableSet.of(SCOPE_QNAME, DATASTORE_QNAME, OUTPUT_TYPE_QNAME));
.build());
}
+ /**
+ * Create device notification stream.
+ *
+ * @param baseUrl base Url
+ * @param payload data
+ * @param refSchemaCtx Reference to {@link EffectiveModelContext}.
+ * @param streamUtil stream utility
+ * @param mountPointService dom mount point service
+ * @return {@link DOMRpcResult} - Output of RPC - example in JSON
+ */
+ static DOMRpcResult createDeviceNotificationListener(final String baseUrl, final NormalizedNodePayload payload,
+ final EffectiveModelContext refSchemaCtx, final SubscribeToStreamUtil streamUtil,
+ final DOMMountPointService mountPointService) {
+ // parsing out of container with settings and path
+ final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
+ final YangInstanceIdentifier value =
+ (YangInstanceIdentifier) data.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID)
+ .map(DataContainerChild::body)
+ .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
+ ErrorTag.OPERATION_FAILED));
+ final DOMMountPoint mountPoint = mountPointService.getMountPoint(value)
+ .orElseThrow(() -> new RestconfDocumentedException("Mount point not available", ErrorType.APPLICATION,
+ ErrorTag.OPERATION_FAILED));
+
+ final String deviceName = extractDeviceName(value);
+ final NotificationOutputType outputType = prepareOutputType(data);
+ EffectiveModelContext effectiveModelContext = mountPoint.getService(DOMSchemaService.class).get()
+ .getGlobalContext();
+ Collection<? extends NotificationDefinition> notificationDefinitions = mountPoint.getService(
+ DOMSchemaService.class).get().getGlobalContext()
+ .getNotifications();
+ if (notificationDefinitions == null || notificationDefinitions.isEmpty()) {
+ throw new RestconfDocumentedException("Device does not support notification", ErrorType.APPLICATION,
+ ErrorTag.OPERATION_FAILED);
+ }
+
+ Set<Absolute> absolutes = notificationDefinitions.stream()
+ .map(notificationDefinition -> Absolute.of(notificationDefinition.getQName()))
+ .collect(Collectors.toUnmodifiableSet());
+
+ final DeviceNotificationListenerAdaptor notificationListenerAdapter = ListenersBroker.getInstance()
+ .registerDeviceNotificationListener(deviceName, outputType, effectiveModelContext, mountPointService,
+ mountPoint.getIdentifier());
+ notificationListenerAdapter.listen(mountPoint.getService(DOMNotificationService.class).get(), absolutes);
+
+ // building of output
+ return new DefaultDOMRpcResult(Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
+ .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH, baseUrl + deviceName
+ + "?" + RestconfStreamsConstants.NOTIFICATION_TYPE + "=" + RestconfStreamsConstants.DEVICE))
+ .build());
+ }
+
+ private static String extractDeviceName(final YangInstanceIdentifier iid) {
+ return ((YangInstanceIdentifier.NodeIdentifierWithPredicates.Singleton)iid.getLastPathArgument())
+ .values().getElement().toString();
+ }
+
/**
* Prepare {@link NotificationOutputType}.
*