X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=restconf%2Frestconf-nb-rfc8040%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Frestconf%2Fnb%2Frfc8040%2Fstreams%2Flisteners%2FListenerAdapter.java;h=b438f36b0223c50b501916933b8b4a412c40d7c3;hb=0226b5a21bb755b75a25b208fd0ecca20fd8968c;hp=7e2d89b6ffd8056a8eddc4e8e63312828d2934b4;hpb=01a6df747b35181b5cb7a1a8cf7bdd8ecd511ef6;p=netconf.git diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java index 7e2d89b6ff..b438f36b02 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java @@ -7,361 +7,110 @@ */ package org.opendaylight.restconf.nb.rfc8040.streams.listeners; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.dataformat.xml.XmlMapper; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import java.io.IOException; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import javax.xml.stream.XMLStreamException; -import javax.xml.transform.dom.DOMResult; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener; +import com.google.common.annotations.VisibleForTesting; +import java.time.Instant; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; +import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; -import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode; -import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree; -import org.opendaylight.yangtools.yang.model.api.Module; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.w3c.dom.Document; -import org.w3c.dom.Element; -import org.w3c.dom.Node; /** - * {@link ListenerAdapter} is responsible to track events, which occurred by - * changing data in data source. + * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source. */ -public class ListenerAdapter extends AbstractCommonSubscriber implements DOMDataChangeListener { - +public class ListenerAdapter extends AbstractCommonSubscriber> + implements ClusteredDOMDataTreeChangeListener { private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class); - - private final YangInstanceIdentifier path; - private final String streamName; - private final NotificationOutputType outputType; - - private AsyncDataChangeEvent> change; + private static final DataTreeCandidateFormatterFactory JSON_FORMATTER_FACTORY = + JSONDataTreeCandidateFormatter.createFactory(JSONCodecFactorySupplier.RFC7951); /** - * Creates new {@link ListenerAdapter} listener specified by path and stream - * name and register for subscribing. + * Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing. * - * @param path - * Path to data in data store. - * @param streamName - * The name of the stream. - * @param outputType - * Type of output on notification (JSON, XML) + * @param path Path to data in data store. + * @param streamName The name of the stream. + * @param outputType Type of output on notification (JSON, XML). */ - ListenerAdapter(final YangInstanceIdentifier path, final String streamName, + @VisibleForTesting + public ListenerAdapter(final YangInstanceIdentifier path, final String streamName, final NotificationOutputType outputType) { - register(this); - setLocalNameOfPath(path.getLastPathArgument().getNodeType().getLocalName()); - - this.outputType = Preconditions.checkNotNull(outputType); - this.path = Preconditions.checkNotNull(path); - Preconditions.checkArgument((streamName != null) && !streamName.isEmpty()); - this.streamName = streamName; + super(path.getLastPathArgument().getNodeType(), streamName, path, outputType, getFormatterFactory(outputType)); } - @Override - public void onDataChanged(final AsyncDataChangeEvent> change) { - this.change = change; - final String xml = prepareXml(); - if (checkQueryParams(xml, this)) { - prepareAndPostData(xml); + private static DataTreeCandidateFormatterFactory getFormatterFactory(final NotificationOutputType outputType) { + switch (outputType) { + case JSON: + return JSON_FORMATTER_FACTORY; + case XML: + return XMLDataTreeCandidateFormatter.FACTORY; + default: + throw new IllegalArgumentException("Unsupported outputType" + outputType); } } - /** - * Gets the name of the stream. - * - * @return The name of the stream. - */ @Override - public String getStreamName() { - return this.streamName; + public void onInitialData() { + // No-op } @Override - public String getOutputType() { - return this.outputType.getName(); - } - - /** - * Get path pointed to data in data store. - * - * @return Path pointed to data in data store. - */ - public YangInstanceIdentifier getPath() { - return this.path; - } - - /** - * Prepare data of notification and data to client. - * - * @param xml data - */ - private void prepareAndPostData(final String xml) { - final Event event = new Event(EventType.NOTIFY); - if (this.outputType.equals(NotificationOutputType.JSON)) { - try { - final JsonNode node = new XmlMapper().readTree(xml.getBytes()); - event.setData(node.toString()); - } catch (final IOException e) { - LOG.error("Error parsing XML {}", xml, e); - Throwables.propagate(e); - } - } else { - event.setData(xml); - } - post(event); - } - - /** - * Tracks events of data change by customer. - */ - - /** - * Prepare data in printable form and transform it to String. - * - * @return Data in printable form. - */ - private String prepareXml() { - final SchemaContext schemaContext = schemaHandler.get(); - final DataSchemaContextTree dataContextTree = DataSchemaContextTree.from(schemaContext); - final Document doc = createDocument(); - final Element notificationElement = basePartDoc(doc); - - final Element dataChangedNotificationEventElement = doc.createElementNS( - "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification"); - - addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, this.change, - schemaContext, dataContextTree); - notificationElement.appendChild(dataChangedNotificationEventElement); - return transformDoc(doc); - } - - /** - * Adds values to data changed notification event element. - * - * @param doc - * {@link Document} - * @param dataChangedNotificationEventElement - * {@link Element} - * @param change - * {@link AsyncDataChangeEvent} - */ - private void addValuesToDataChangedNotificationEventElement(final Document doc, - final Element dataChangedNotificationEventElement, - final AsyncDataChangeEvent> change, - final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) { - - addCreatedChangedValuesFromDataToElement(doc, change.getCreatedData().entrySet(), - dataChangedNotificationEventElement, Operation.CREATED, schemaContext, dataSchemaContextTree); - - addCreatedChangedValuesFromDataToElement(doc, change.getUpdatedData().entrySet(), - dataChangedNotificationEventElement, Operation.UPDATED, schemaContext, dataSchemaContextTree); - - addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement, - Operation.DELETED, schemaContext, dataSchemaContextTree); - } - - /** - * Adds values from data to element. - * - * @param doc - * {@link Document} - * @param data - * Set of {@link YangInstanceIdentifier}. - * @param element - * {@link Element} - * @param operation - * {@link Operation} - * @param schemaContext - * schema context - * @param dataSchemaContextTree - * data schema context tree - */ - private void addValuesFromDataToElement(final Document doc, final Set data, - final Element element, final Operation operation, final SchemaContext schemaContext, - final DataSchemaContextTree dataSchemaContextTree) { - if ((data == null) || data.isEmpty()) { + @SuppressWarnings("checkstyle:IllegalCatch") + public void onDataTreeChanged(final List dataTreeCandidates) { + final Instant now = Instant.now(); + if (!checkStartStop(now)) { return; } - for (final YangInstanceIdentifier path : data) { - if (!dataSchemaContextTree.getChild(path).isMixin()) { - final Node node = createDataChangeEventElement(doc, path, operation, schemaContext); - element.appendChild(node); - } - } - } - private void addCreatedChangedValuesFromDataToElement(final Document doc, - final Set>> data, final Element element, - final Operation operation, final SchemaContext schemaContext, - final DataSchemaContextTree dataSchemaContextTree) { - if ((data == null) || data.isEmpty()) { + final Optional maybeData; + try { + maybeData = formatter().eventData(schemaHandler.get(), dataTreeCandidates, now, getLeafNodesOnly(), + isSkipNotificationData()); + } catch (final Exception e) { + LOG.error("Failed to process notification {}", + dataTreeCandidates.stream().map(Object::toString).collect(Collectors.joining(",")), e); return; } - for (final Entry> entry : data) { - if (!dataSchemaContextTree.getChild(entry.getKey()).isMixin() - && (!getLeafNodesOnly() || entry.getValue() instanceof LeafNode)) { - final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext, - dataSchemaContextTree); - element.appendChild(node); - } - } - } - - /** - * Creates changed event element from data. - * - * @param doc - * {@link Document} - * @param path - * Path to data in data store. - * @param operation - * {@link Operation} - * @param schemaContext - * schema context - * @return {@link Node} node represented by changed event element. - */ - private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path, - final Operation operation, final SchemaContext schemaContext) { - final Element dataChangeEventElement = doc.createElement("data-change-event"); - final Element pathElement = doc.createElement("path"); - addPathAsValueToElement(path, pathElement, schemaContext); - dataChangeEventElement.appendChild(pathElement); - - final Element operationElement = doc.createElement("operation"); - operationElement.setTextContent(operation.value); - dataChangeEventElement.appendChild(operationElement); - - return dataChangeEventElement; - } - private Node createCreatedChangedDataChangeEventElement(final Document doc, - final Entry> entry, final Operation operation, - final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) { - final Element dataChangeEventElement = doc.createElement("data-change-event"); - final Element pathElement = doc.createElement("path"); - final YangInstanceIdentifier path = entry.getKey(); - addPathAsValueToElement(path, pathElement, schemaContext); - dataChangeEventElement.appendChild(pathElement); - - final Element operationElement = doc.createElement("operation"); - operationElement.setTextContent(operation.value); - dataChangeEventElement.appendChild(operationElement); - - try { - SchemaPath nodePath; - final NormalizedNode normalized = entry.getValue(); - if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) { - nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath(); - } else { - nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent(); - } - final DOMResult domResult = writeNormalizedNode(normalized, schemaContext, nodePath); - final Node result = doc.importNode(domResult.getNode().getFirstChild(), true); - final Element dataElement = doc.createElement("data"); - dataElement.appendChild(result); - dataChangeEventElement.appendChild(dataElement); - } catch (final IOException e) { - LOG.error("Error in writer ", e); - } catch (final XMLStreamException e) { - LOG.error("Error processing stream", e); + if (maybeData.isPresent()) { + post(maybeData.get()); } - - return dataChangeEventElement; } /** - * Adds path as value to element. + * Get path pointed to data in data store. * - * @param path - * Path to data in data store. - * @param element - * {@link Element} - * @param schemaContext - * schema context + * @return Path pointed to data in data store. */ - @SuppressWarnings("rawtypes") - private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element, - final SchemaContext schemaContext) { - final StringBuilder textContent = new StringBuilder(); - - for (final PathArgument pathArgument : path.getPathArguments()) { - if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) { - continue; - } - textContent.append("/"); - writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), schemaContext); - if (pathArgument instanceof NodeIdentifierWithPredicates) { - final Map predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues(); - for (final QName keyValue : predicates.keySet()) { - final String predicateValue = String.valueOf(predicates.get(keyValue)); - textContent.append("["); - writeIdentifierWithNamespacePrefix(element, textContent, keyValue, schemaContext); - textContent.append("='"); - textContent.append(predicateValue); - textContent.append("'"); - textContent.append("]"); - } - } else if (pathArgument instanceof NodeWithValue) { - textContent.append("[.='"); - textContent.append(((NodeWithValue) pathArgument).getValue()); - textContent.append("'"); - textContent.append("]"); - } - } - element.setTextContent(textContent.toString()); + public YangInstanceIdentifier getPath() { + return path(); } /** - * Writes identifier that consists of prefix and QName. + * Register data change listener in DOM data broker and set it to listener on stream. * - * @param element - * {@link Element} - * @param textContent - * StringBuilder - * @param qualifiedName - * QName - * @param schemaContext - * schema context + * @param domDataBroker data broker for register data change listener + * @param datastore {@link LogicalDatastoreType} */ - private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent, - final QName qualifiedName, final SchemaContext schemaContext) { - final Module module = schemaContext.findModuleByNamespaceAndRevision(qualifiedName.getNamespace(), - qualifiedName.getRevision()); - - textContent.append(module.getName()); - textContent.append(":"); - textContent.append(qualifiedName.getLocalName()); - } - - /** - * Consists of three types {@link Operation#CREATED}, - * {@link Operation#UPDATED} and {@link Operation#DELETED}. - */ - private enum Operation { - CREATED("created"), UPDATED("updated"), DELETED("deleted"); - - private final String value; + public final synchronized void listen(final DOMDataBroker domDataBroker, final LogicalDatastoreType datastore) { + if (!isListening()) { + final DOMDataTreeChangeService changeService = domDataBroker.getExtensions() + .getInstance(DOMDataTreeChangeService.class); + if (changeService == null) { + throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService"); + } - Operation(final String value) { - this.value = value; + setRegistration(changeService.registerDataTreeChangeListener( + new DOMDataTreeIdentifier(datastore, getPath()), this)); } } }