*/
package org.opendaylight.restconf.nb.rfc8040.rests.utils;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.List;
+import static java.util.Objects.requireNonNull;
+
import java.util.Optional;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
import org.opendaylight.restconf.common.util.DataChangeScope;
import org.opendaylight.restconf.nb.rfc8040.references.SchemaContextRef;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Util class for streams.
- *
- * <ul>
- * <li>create stream
- * <li>subscribe
- * </ul>
- *
+ * Utility class for creation of data-change-event or YANG notification streams.
*/
public final class CreateStreamUtil {
private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
- private static final String OUTPUT_TYPE_PARAM_NAME = "notification-output-type";
private CreateStreamUtil() {
- throw new UnsupportedOperationException("Util class");
+ throw new UnsupportedOperationException("Utility class");
}
/**
- * Create stream with POST operation via RPC.
- *
- * @param payload
- * input of rpc - example in JSON:
- *
- * <pre>
- * {@code
- * {
- * "input": {
- * "path": "/toaster:toaster/toaster:toasterStatus",
- * "sal-remote-augment:datastore": "OPERATIONAL",
- * "sal-remote-augment:scope": "ONE"
- * }
- * }
- * }
- * </pre>
- *
- * @param refSchemaCtx
- * reference to {@link SchemaContext} -
- * {@link SchemaContextRef}
- * @return {@link DOMRpcResult} - This means output of RPC - example in JSON:
+ * Create data-change-event or notification stream with POST operation via RPC.
*
- * <pre>
- * {@code
+ * @param payload Input of RPC - example in JSON (data-change-event stream):
+ * <pre>
+ * {@code
+ * {
+ * "input": {
+ * "path": "/toaster:toaster/toaster:toasterStatus",
+ * "sal-remote-augment:datastore": "OPERATIONAL",
+ * "sal-remote-augment:scope": "ONE"
+ * }
+ * }
+ * }
+ * </pre>
+ * @param refSchemaCtx Reference to {@link SchemaContext} - {@link SchemaContextRef}.
+ * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
+ * <pre>
+ * {@code
* {
* "output": {
* "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
* }
* }
- * }
- * </pre>
- *
+ * }
+ * </pre>
*/
public static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodeContext payload,
final SchemaContextRef refSchemaCtx) {
- final ContainerNode data = (ContainerNode) payload.getData();
+ // parsing out of container with settings and path
+ final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
final YangInstanceIdentifier path = preparePath(data, qname);
- String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data);
-
- final QName outputQname = QName.create(qname, "output");
- final QName streamNameQname = QName.create(qname, "stream-name");
+ // building of stream name
+ final StringBuilder streamNameBuilder = new StringBuilder(
+ prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx).get(), data));
final NotificationOutputType outputType = prepareOutputType(data);
if (outputType.equals(NotificationOutputType.JSON)) {
- streamName = streamName + "/JSON";
+ streamNameBuilder.append('/').append(outputType.getName());
}
+ final String streamName = streamNameBuilder.toString();
- if (!Notificator.existListenerFor(streamName)) {
- Notificator.createListener(path, streamName, outputType);
- }
+ // registration of the listener
+ ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
+
+ // building of output
+ final QName outputQname = QName.create(qname, RestconfStreamsConstants.OUTPUT_CONTAINER_NAME);
+ final QName streamNameQname = QName.create(qname, RestconfStreamsConstants.OUTPUT_STREAM_NAME);
- final ContainerNode output =
- ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname))
- .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+ final ContainerNode output = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(outputQname))
+ .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
return new DefaultDOMRpcResult(output);
}
/**
- * Prepare {@code NotificationOutputType}.
+ * Prepare {@link NotificationOutputType}.
*
- * @param data
- * data of notification
- * @return output type fo notification
+ * @param data Container with stream settings (RPC create-stream).
+ * @return Parsed {@link NotificationOutputType}.
*/
private static NotificationOutputType prepareOutputType(final ContainerNode data) {
- NotificationOutputType outputType = parseEnum(data, NotificationOutputType.class, OUTPUT_TYPE_PARAM_NAME);
+ NotificationOutputType outputType = parseEnum(
+ data, NotificationOutputType.class, RestconfStreamsConstants.OUTPUT_TYPE_PARAM_NAME);
return outputType == null ? NotificationOutputType.XML : outputType;
}
+ /**
+ * Prepare stream name.
+ *
+ * @param path Path of element from which data-change-event notifications are going to be generated.
+ * @param schemaContext Schema context.
+ * @param data Container with stream settings (RPC create-stream).
+ * @return Parsed stream name.
+ */
private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
- final SchemaContext schemaContext,
- final ContainerNode data) {
- LogicalDatastoreType ds = parseEnum(data, LogicalDatastoreType.class,
- RestconfStreamsConstants.DATASTORE_PARAM_NAME);
- ds = ds == null ? RestconfStreamsConstants.DEFAULT_DS : ds;
+ final SchemaContext schemaContext, final ContainerNode data) {
+ LogicalDatastoreType datastoreType = parseEnum(
+ data, LogicalDatastoreType.class, RestconfStreamsConstants.DATASTORE_PARAM_NAME);
+ datastoreType = datastoreType == null ? RestconfStreamsConstants.DEFAULT_DS : datastoreType;
DataChangeScope scope = parseEnum(data, DataChangeScope.class, RestconfStreamsConstants.SCOPE_PARAM_NAME);
scope = scope == null ? RestconfStreamsConstants.DEFAULT_SCOPE : scope;
- final String streamName = RestconfStreamsConstants.DATA_SUBSCR + "/"
- + Notificator
- .createStreamNameFromUri(ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
- + RestconfStreamsConstants.DS_URI + ds + RestconfStreamsConstants.SCOPE_URI + scope);
- return streamName;
+ return RestconfStreamsConstants.DATA_SUBSCRIPTION
+ + "/"
+ + ListenersBroker.createStreamNameFromUri(
+ ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
+ + RestconfStreamsConstants.DS_URI
+ + datastoreType
+ + RestconfStreamsConstants.SCOPE_URI
+ + scope);
+ }
+
+ /**
+ * Prepare {@link YangInstanceIdentifier} of stream source.
+ *
+ * @param data Container with stream settings (RPC create-stream).
+ * @param qualifiedName QName of the input RPC context (used only in debugging).
+ * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
+ * are going to be generated.
+ */
+ private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
+ final Optional<DataContainerChild<? extends PathArgument, ?>> path = data.getChild(
+ new YangInstanceIdentifier.NodeIdentifier(QName.create(
+ qualifiedName,
+ RestconfStreamsConstants.STREAM_PATH_PARAM_NAME)));
+ Object pathValue = null;
+ if (path.isPresent()) {
+ pathValue = path.get().getValue();
+ }
+ if (!(pathValue instanceof YangInstanceIdentifier)) {
+ LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
+ throw new RestconfDocumentedException(
+ "Instance identifier was not normalized correctly",
+ ErrorType.APPLICATION,
+ ErrorTag.OPERATION_FAILED);
+ }
+ return (YangInstanceIdentifier) pathValue;
}
+ /**
+ * Parsing out of enumeration from RPC create-stream body.
+ *
+ * @param data Container with stream settings (RPC create-stream).
+ * @param clazz Enum type to be parsed out from input container.
+ * @param paramName Local name of the enum element.
+ * @return Parsed enumeration.
+ */
private static <T> T parseEnum(final ContainerNode data, final Class<T> clazz, final String paramName) {
final Optional<DataContainerChild<? extends PathArgument, ?>> optAugNode = data.getChild(
- RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER);
+ RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER);
if (!optAugNode.isPresent()) {
return null;
}
return null;
}
final Optional<DataContainerChild<? extends PathArgument, ?>> enumNode = ((AugmentationNode) augNode).getChild(
- new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName)));
+ new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName)));
if (!enumNode.isPresent()) {
return null;
}
return ResolveEnumUtil.resolveEnum(clazz, (String) value);
}
- private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
- final Optional<DataContainerChild<? extends PathArgument, ?>> path = data
- .getChild(new YangInstanceIdentifier.NodeIdentifier(QName.create(qualifiedName, "path")));
- Object pathValue = null;
- if (path.isPresent()) {
- pathValue = path.get().getValue();
- }
- if (!(pathValue instanceof YangInstanceIdentifier)) {
- LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
- throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
- ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
- }
- return (YangInstanceIdentifier) pathValue;
- }
-
/**
- * Create stream with POST operation via RPC.
+ * Create YANG notification stream using notification definition in YANG schema.
*
- * @param notificatinoDefinition
- * input of RPC
- * @param refSchemaCtx
- * schemaContext
- * @param outputType
- * output type
- * @return {@link DOMRpcResult}
+ * @param notificationDefinition YANG notification definition.
+ * @param refSchemaCtx Reference to {@link SchemaContext} - {@link SchemaContextRef}.
+ * @param outputType Output type (XML or JSON).
+ * @return {@link NotificationListenerAdapter}
*/
- public static List<NotificationListenerAdapter> createYangNotifiStream(
- final NotificationDefinition notificatinoDefinition, final SchemaContextRef refSchemaCtx,
- final String outputType) {
- final List<SchemaPath> paths = new ArrayList<>();
- final QName notificatinoDefinitionQName = notificatinoDefinition.getQName();
- final Module module =
- refSchemaCtx.findModuleByNamespaceAndRevision(notificatinoDefinitionQName.getModule().getNamespace(),
- notificatinoDefinitionQName.getModule().getRevision());
- Preconditions.checkNotNull(module,
- "Module for namespace " + notificatinoDefinitionQName.getModule().getNamespace() + " does not exist");
- NotificationDefinition notifiDef = null;
- for (final NotificationDefinition notification : module.getNotifications()) {
- if (notification.getQName().equals(notificatinoDefinitionQName)) {
- notifiDef = notification;
- break;
- }
- }
- final String moduleName = module.getName();
- Preconditions.checkNotNull(notifiDef,
- "Notification " + notificatinoDefinitionQName + "doesn't exist in module " + moduleName);
- paths.add(notifiDef.getPath());
- String streamName = RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM + "/";
- streamName = streamName + moduleName + ":" + notificatinoDefinitionQName.getLocalName();
- if (outputType.equals("JSON")) {
- streamName = streamName + "/JSON";
- }
+ public static NotificationListenerAdapter createYangNotifiStream(
+ final NotificationDefinition notificationDefinition, final SchemaContextRef refSchemaCtx,
+ final NotificationOutputType outputType) {
+ final String streamName = parseNotificationStreamName(requireNonNull(notificationDefinition),
+ requireNonNull(refSchemaCtx), requireNonNull(outputType.getName()));
+ final Optional<NotificationListenerAdapter> listenerForStreamName = ListenersBroker.getInstance()
+ .getNotificationListenerFor(streamName);
+ return listenerForStreamName.orElseGet(() -> ListenersBroker.getInstance().registerNotificationListener(
+ notificationDefinition.getPath(), streamName, outputType));
+ }
- if (Notificator.existNotificationListenerFor(streamName)) {
- final List<NotificationListenerAdapter> notificationListenerFor =
- Notificator.getNotificationListenerFor(streamName);
- return SubscribeToStreamUtil.pickSpecificListenerByOutput(notificationListenerFor, outputType);
+ private static String parseNotificationStreamName(final NotificationDefinition notificationDefinition,
+ final SchemaContextRef refSchemaCtx, final String outputType) {
+ final QName notificationDefinitionQName = notificationDefinition.getQName();
+ final Module module = refSchemaCtx.findModuleByNamespaceAndRevision(
+ notificationDefinitionQName.getModule().getNamespace(),
+ notificationDefinitionQName.getModule().getRevision());
+ requireNonNull(module, String.format("Module for namespace %s does not exist.",
+ notificationDefinitionQName.getModule().getNamespace()));
+
+ final StringBuilder streamNameBuilder = new StringBuilder();
+ streamNameBuilder.append(RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM)
+ .append('/')
+ .append(module.getName())
+ .append(':')
+ .append(notificationDefinitionQName.getLocalName());
+ if (outputType.equals(NotificationOutputType.JSON.getName())) {
+ streamNameBuilder.append(NotificationOutputType.JSON.getName());
}
-
- return Notificator.createNotificationListener(paths, streamName, outputType);
+ return streamNameBuilder.toString();
}
-}
+}
\ No newline at end of file