*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.dataformat.xml.XmlMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
import java.io.IOException;
-import java.util.Map;
+import java.time.Instant;
+import java.util.Collection;
import java.util.Map.Entry;
-import java.util.Set;
+import java.util.Optional;
import javax.xml.stream.XMLStreamException;
import javax.xml.transform.dom.DOMResult;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.json.XML;
+import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.util.DataSchemaContextNode;
import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.w3c.dom.Node;
/**
- * {@link ListenerAdapter} is responsible to track events, which occurred by
- * changing data in data source.
+ * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
*/
-public class ListenerAdapter extends AbstractCommonSubscriber implements DOMDataChangeListener {
+public class ListenerAdapter extends AbstractCommonSubscriber implements ClusteredDOMDataTreeChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
private final String streamName;
private final NotificationOutputType outputType;
- private AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change;
-
/**
- * Creates new {@link ListenerAdapter} listener specified by path and stream
- * name and register for subscribing.
+ * Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing.
*
- * @param path
- * Path to data in data store.
- * @param streamName
- * The name of the stream.
- * @param outputType
- * Type of output on notification (JSON, XML)
+ * @param path Path to data in data store.
+ * @param streamName The name of the stream.
+ * @param outputType Type of output on notification (JSON, XML).
*/
ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
final NotificationOutputType outputType) {
- super();
- register(this);
setLocalNameOfPath(path.getLastPathArgument().getNodeType().getLocalName());
- this.outputType = Preconditions.checkNotNull(outputType);
- this.path = Preconditions.checkNotNull(path);
- Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
- this.streamName = streamName;
+ this.outputType = requireNonNull(outputType);
+ this.path = requireNonNull(path);
+ this.streamName = requireNonNull(streamName);
+ checkArgument(!streamName.isEmpty());
}
@Override
- public void onDataChanged(final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
- this.change = change;
- final String xml = prepareXml();
- if (checkQueryParams(xml, this)) {
+ public void onDataTreeChanged(final Collection<DataTreeCandidate> dataTreeCandidates) {
+ final Instant now = Instant.now();
+ if (!checkStartStop(now, this)) {
+ return;
+ }
+
+ final String xml = prepareXml(dataTreeCandidates);
+ if (checkFilter(xml)) {
prepareAndPostData(xml);
}
}
/**
* Prepare data of notification and data to client.
*
- * @param xml data
+ * @param xml XML-formatted data.
*/
private void prepareAndPostData(final String xml) {
- final Event event = new Event(EventType.NOTIFY);
if (this.outputType.equals(NotificationOutputType.JSON)) {
- try {
- final JsonNode node = new XmlMapper().readTree(xml.getBytes());
- event.setData(node.toString());
- } catch (final IOException e) {
- LOG.error("Error parsing XML {}", xml, e);
- Throwables.propagate(e);
- }
+ post(XML.toJSONObject(xml).toString());
} else {
- event.setData(xml);
+ post(xml);
}
- post(event);
}
- /**
- * Tracks events of data change by customer.
- */
-
/**
* Prepare data in printable form and transform it to String.
*
+ * @param dataTreeCandidates Data-tree candidates to be transformed.
* @return Data in printable form.
*/
- private String prepareXml() {
+ private String prepareXml(final Collection<DataTreeCandidate> dataTreeCandidates) {
final SchemaContext schemaContext = schemaHandler.get();
final DataSchemaContextTree dataContextTree = DataSchemaContextTree.from(schemaContext);
final Document doc = createDocument();
final Element dataChangedNotificationEventElement = doc.createElementNS(
"urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
- addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, this.change,
+ addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, dataTreeCandidates,
schemaContext, dataContextTree);
notificationElement.appendChild(dataChangedNotificationEventElement);
return transformDoc(doc);
/**
* Adds values to data changed notification event element.
- *
- * @param doc
- * {@link Document}
- * @param dataChangedNotificationEventElement
- * {@link Element}
- * @param change
- * {@link AsyncDataChangeEvent}
*/
+ @SuppressWarnings("checkstyle:hiddenField")
private void addValuesToDataChangedNotificationEventElement(final Document doc,
- final Element dataChangedNotificationEventElement,
- final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change,
+ final Element dataChangedNotificationEventElement, final Collection<DataTreeCandidate> dataTreeCandidates,
final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
- addCreatedChangedValuesFromDataToElement(doc, change.getCreatedData().entrySet(),
- dataChangedNotificationEventElement, Operation.CREATED, schemaContext, dataSchemaContextTree);
-
- addCreatedChangedValuesFromDataToElement(doc, change.getUpdatedData().entrySet(),
- dataChangedNotificationEventElement, Operation.UPDATED, schemaContext, dataSchemaContextTree);
-
- addValuesFromDataToElement(doc, change.getRemovedPaths(), dataChangedNotificationEventElement,
- Operation.DELETED, schemaContext, dataSchemaContextTree);
- }
-
- /**
- * Adds values from data to element.
- *
- * @param doc
- * {@link Document}
- * @param data
- * Set of {@link YangInstanceIdentifier}.
- * @param element
- * {@link Element}
- * @param operation
- * {@link Operation}
- * @param schemaContext
- * @param dataSchemaContextTree
- */
- private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data,
- final Element element, final Operation operation, final SchemaContext schemaContext,
- final DataSchemaContextTree dataSchemaContextTree) {
- if ((data == null) || data.isEmpty()) {
- return;
- }
- for (final YangInstanceIdentifier path : data) {
- if (!dataSchemaContextTree.getChild(path).isMixin()) {
- final Node node = createDataChangeEventElement(doc, path, operation, schemaContext);
- element.appendChild(node);
+ for (DataTreeCandidate dataTreeCandidate : dataTreeCandidates) {
+ DataTreeCandidateNode candidateNode = dataTreeCandidate.getRootNode();
+ if (candidateNode == null) {
+ continue;
}
+ YangInstanceIdentifier yiid = dataTreeCandidate.getRootPath();
+ addNodeToDataChangeNotificationEventElement(doc, dataChangedNotificationEventElement, candidateNode,
+ yiid.getParent(), schemaContext, dataSchemaContextTree);
}
}
- private void addCreatedChangedValuesFromDataToElement(final Document doc,
- final Set<Entry<YangInstanceIdentifier, NormalizedNode<?, ?>>> data, final Element element,
- final Operation operation, final SchemaContext schemaContext,
+ private void addNodeToDataChangeNotificationEventElement(final Document doc,
+ final Element dataChangedNotificationEventElement, final DataTreeCandidateNode candidateNode,
+ final YangInstanceIdentifier parentYiid, final SchemaContext schemaContext,
final DataSchemaContextTree dataSchemaContextTree) {
- if ((data == null) || data.isEmpty()) {
+
+ Optional<NormalizedNode<?, ?>> optionalNormalizedNode = Optional.empty();
+ switch (candidateNode.getModificationType()) {
+ case APPEARED:
+ case SUBTREE_MODIFIED:
+ case WRITE:
+ optionalNormalizedNode = candidateNode.getDataAfter();
+ break;
+ case DELETE:
+ case DISAPPEARED:
+ optionalNormalizedNode = candidateNode.getDataBefore();
+ break;
+ case UNMODIFIED:
+ default:
+ break;
+ }
+
+ if (!optionalNormalizedNode.isPresent()) {
+ LOG.error("No node present in notification for {}", candidateNode);
return;
}
- for (final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
- if (!dataSchemaContextTree.getChild(entry.getKey()).isMixin()
- && (!getLeafNodesOnly() || entry.getValue() instanceof LeafNode)) {
- final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext,
- dataSchemaContextTree);
- element.appendChild(node);
+
+ NormalizedNode<?, ?> normalizedNode = optionalNormalizedNode.get();
+ YangInstanceIdentifier yiid = YangInstanceIdentifier.builder(parentYiid)
+ .append(normalizedNode.getIdentifier()).build();
+
+ final Optional<DataSchemaContextNode<?>> childrenSchemaNode = dataSchemaContextTree.findChild(yiid);
+ checkState(childrenSchemaNode.isPresent());
+ boolean isNodeMixin = childrenSchemaNode.get().isMixin();
+ boolean isSkippedNonLeaf = getLeafNodesOnly() && !(normalizedNode instanceof LeafNode);
+ if (!isNodeMixin && !isSkippedNonLeaf) {
+ Node node = null;
+ switch (candidateNode.getModificationType()) {
+ case APPEARED:
+ case SUBTREE_MODIFIED:
+ case WRITE:
+ Operation op = candidateNode.getDataBefore().isPresent() ? Operation.UPDATED : Operation.CREATED;
+ node = createCreatedChangedDataChangeEventElement(doc, yiid, normalizedNode, op,
+ schemaContext, dataSchemaContextTree);
+ break;
+ case DELETE:
+ case DISAPPEARED:
+ node = createDataChangeEventElement(doc, yiid, schemaContext);
+ break;
+ case UNMODIFIED:
+ default:
+ break;
+ }
+ if (node != null) {
+ dataChangedNotificationEventElement.appendChild(node);
}
}
+
+ for (DataTreeCandidateNode childNode : candidateNode.getChildNodes()) {
+ addNodeToDataChangeNotificationEventElement(
+ doc, dataChangedNotificationEventElement, childNode, yiid, schemaContext, dataSchemaContextTree);
+ }
}
/**
- * Creates changed event element from data.
+ * Creates data-changed event element from data.
*
- * @param doc
- * {@link Document}
- * @param path
- * Path to data in data store.
- * @param operation
- * {@link Operation}
- * @param schemaContext
- * @return {@link Node} node represented by changed event element.
+ * @param doc {@link Document}
+ * @param schemaContext Schema context.
+ * @return {@link Node} represented by changed event element.
*/
- private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier path,
- final Operation operation, final SchemaContext schemaContext) {
+ private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier eventPath,
+ final SchemaContext schemaContext) {
final Element dataChangeEventElement = doc.createElement("data-change-event");
final Element pathElement = doc.createElement("path");
- addPathAsValueToElement(path, pathElement, schemaContext);
+ addPathAsValueToElement(eventPath, pathElement, schemaContext);
dataChangeEventElement.appendChild(pathElement);
final Element operationElement = doc.createElement("operation");
- operationElement.setTextContent(operation.value);
+ operationElement.setTextContent(Operation.DELETED.value);
dataChangeEventElement.appendChild(operationElement);
return dataChangeEventElement;
}
- private Node createCreatedChangedDataChangeEventElement(final Document doc,
- final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry, final Operation operation,
- final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
+ private Node createCreatedChangedDataChangeEventElement(final Document doc, final YangInstanceIdentifier eventPath,
+ final NormalizedNode<?, ?> normalized, final Operation operation, final SchemaContext schemaContext,
+ final DataSchemaContextTree dataSchemaContextTree) {
final Element dataChangeEventElement = doc.createElement("data-change-event");
final Element pathElement = doc.createElement("path");
- final YangInstanceIdentifier path = entry.getKey();
- addPathAsValueToElement(path, pathElement, schemaContext);
+ addPathAsValueToElement(eventPath, pathElement, schemaContext);
dataChangeEventElement.appendChild(pathElement);
final Element operationElement = doc.createElement("operation");
try {
SchemaPath nodePath;
- final NormalizedNode<?, ?> normalized = entry.getValue();
- if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
- nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
+ final Optional<DataSchemaContextNode<?>> childrenSchemaNode = dataSchemaContextTree.findChild(eventPath);
+ checkState(childrenSchemaNode.isPresent());
+ if (normalized instanceof MapEntryNode || normalized instanceof UnkeyedListEntryNode) {
+ nodePath = childrenSchemaNode.get().getDataSchemaNode().getPath();
} else {
- nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
+ nodePath = childrenSchemaNode.get().getDataSchemaNode().getPath().getParent();
}
final DOMResult domResult = writeNormalizedNode(normalized, schemaContext, nodePath);
final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
/**
* Adds path as value to element.
*
- * @param path
- * Path to data in data store.
- * @param element
- * {@link Element}
- * @param schemaContext
+ * @param eventPath Path to data in data store.
+ * @param element {@link Element}
+ * @param schemaContext Schema context.
*/
@SuppressWarnings("rawtypes")
- private void addPathAsValueToElement(final YangInstanceIdentifier path, final Element element,
+ private void addPathAsValueToElement(final YangInstanceIdentifier eventPath, final Element element,
final SchemaContext schemaContext) {
final StringBuilder textContent = new StringBuilder();
- for (final PathArgument pathArgument : path.getPathArguments()) {
+ for (final PathArgument pathArgument : eventPath.getPathArguments()) {
if (pathArgument instanceof YangInstanceIdentifier.AugmentationIdentifier) {
continue;
}
textContent.append("/");
- writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), schemaContext);
+ writeIdentifierWithNamespacePrefix(textContent, pathArgument.getNodeType(), schemaContext);
if (pathArgument instanceof NodeIdentifierWithPredicates) {
- final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
- for (final QName keyValue : predicates.keySet()) {
- final String predicateValue = String.valueOf(predicates.get(keyValue));
+ for (final Entry<QName, Object> entry : ((NodeIdentifierWithPredicates) pathArgument).entrySet()) {
+ final QName keyValue = entry.getKey();
+ final String predicateValue = String.valueOf(entry.getValue());
textContent.append("[");
- writeIdentifierWithNamespacePrefix(element, textContent, keyValue, schemaContext);
+ writeIdentifierWithNamespacePrefix(textContent, keyValue, schemaContext);
textContent.append("='");
textContent.append(predicateValue);
textContent.append("'");
/**
* Writes identifier that consists of prefix and QName.
*
- * @param element
- * {@link Element}
- * @param textContent
- * StringBuilder
- * @param qualifiedName
- * QName
- * @param schemaContext
+ * @param textContent Text builder that should be supplemented by QName and its modules name.
+ * @param qualifiedName QName of the element.
+ * @param schemaContext Schema context that holds modules which should contain module specified in QName.
*/
- private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
- final QName qualifiedName, final SchemaContext schemaContext) {
- final Module module = schemaContext.findModuleByNamespaceAndRevision(qualifiedName.getNamespace(),
- qualifiedName.getRevision());
-
- textContent.append(module.getName());
- textContent.append(":");
- textContent.append(qualifiedName.getLocalName());
+ private static void writeIdentifierWithNamespacePrefix(final StringBuilder textContent, final QName qualifiedName,
+ final SchemaContext schemaContext) {
+ final Optional<Module> module = schemaContext.findModule(qualifiedName.getModule());
+ if (module.isPresent()) {
+ textContent.append(module.get().getName());
+ textContent.append(":");
+ textContent.append(qualifiedName.getLocalName());
+ } else {
+ LOG.error("Cannot write identifier with namespace prefix in data-change listener adapter: "
+ + "Cannot find module in schema context for input QName {}.", qualifiedName);
+ throw new IllegalStateException(String.format("Cannot find module in schema context for input QName %s.",
+ qualifiedName));
+ }
}
/**
- * Consists of three types {@link Operation#CREATED},
- * {@link Operation#UPDATED} and {@link Operation#DELETED}.
+ * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
*/
private enum Operation {
- CREATED("created"), UPDATED("updated"), DELETED("deleted");
+ CREATED("created"),
+ UPDATED("updated"),
+ DELETED("deleted");
private final String value;
this.value = value;
}
}
-}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("path", path)
+ .add("stream-name", streamName)
+ .add("output-type", outputType)
+ .toString();
+ }
+}
\ No newline at end of file